You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
124 lines
4.4 KiB
124 lines
4.4 KiB
import base64 |
|
import pytest |
|
import mock |
|
import json |
|
import datetime |
|
import pappyproxy |
|
|
|
from pappyproxy.util import PappyException |
|
from pappyproxy.comm import CommServer |
|
from pappyproxy.http import Request, Response |
|
from testutil import mock_deferred, func_deleted, TLSStringTransport, freeze, mock_int_macro, no_tcp |
|
|
|
@pytest.fixture(autouse=True) |
|
def no_int_macros(mocker): |
|
mocker.patch('pappyproxy.plugin.active_intercepting_macros').return_value = {} |
|
|
|
@pytest.fixture |
|
def http_request(): |
|
req = Request('GET / HTTP/1.1\r\n\r\n') |
|
req.host = 'www.foo.faketld' |
|
req.port = '1337' |
|
req.is_ssl = True |
|
req.reqid = 123 |
|
|
|
rsp = Response('HTTP/1.1 200 OK\r\n\r\n') |
|
req.response = rsp |
|
return req |
|
|
|
def perform_comm(line): |
|
serv = CommServer() |
|
serv.transport = TLSStringTransport() |
|
serv.lineReceived(line) |
|
n = datetime.datetime.now() |
|
while serv.transport.value() == '': |
|
t = datetime.datetime.now() |
|
if (t-n).total_seconds() > 5: |
|
raise Exception("Request timed out") |
|
return serv.transport.value() |
|
|
|
def test_simple(): |
|
v = perform_comm('{"action": "ping"}') |
|
assert json.loads(v) == {'ping': 'pong', 'success': True} |
|
|
|
def mock_loader(rsp): |
|
def f(*args, **kwargs): |
|
return rsp |
|
return classmethod(f) |
|
|
|
def mock_submitter(rsp): |
|
def f(_, req, *args, **kwargs): |
|
req.response = rsp |
|
req.reqid = 123 |
|
return mock_deferred(req) |
|
return classmethod(f) |
|
|
|
def mock_loader_fail(): |
|
def f(*args, **kwargs): |
|
raise PappyException("lololo message don't exist dawg") |
|
return classmethod(f) |
|
|
|
def test_get_request(mocker, http_request): |
|
mocker.patch.object(pappyproxy.http.Request, 'load_request', new=mock_loader(http_request)) |
|
v = perform_comm('{"action": "get_request", "reqid": "1"}') |
|
|
|
expected_data = json.loads(http_request.to_json()) |
|
expected_data['success'] = True |
|
assert json.loads(v) == expected_data |
|
|
|
def test_get_request_fail(mocker, http_request): |
|
mocker.patch.object(pappyproxy.http.Request, 'load_request', new=mock_loader_fail()) |
|
v = json.loads(perform_comm('{"action": "get_request", "reqid": "1"}')) |
|
|
|
assert v['success'] == False |
|
assert 'message' in v |
|
|
|
def test_get_response(mocker, http_request): |
|
mocker.patch.object(pappyproxy.http.Request, 'load_request', new=mock_loader(http_request)) |
|
mocker.patch.object(pappyproxy.http.Response, 'load_response', new=mock_loader(http_request.response)) |
|
v = perform_comm('{"action": "get_response", "reqid": "1"}') |
|
|
|
expected_data = json.loads(http_request.response.to_json()) |
|
expected_data['success'] = True |
|
assert json.loads(v) == expected_data |
|
|
|
def test_get_response_fail(mocker, http_request): |
|
mocker.patch.object(pappyproxy.http.Request, 'load_request', new=mock_loader(http_request)) |
|
mocker.patch.object(pappyproxy.http.Response, 'load_response', new=mock_loader_fail()) |
|
v = json.loads(perform_comm('{"action": "get_response", "reqid": "1"}')) |
|
|
|
assert v['success'] == False |
|
assert 'message' in v |
|
|
|
def test_submit_request(mocker, http_request): |
|
rsp = Response('HTTP/1.1 200 OK\r\n\r\n') |
|
mocker.patch.object(pappyproxy.http.Request, 'submit_request', new=mock_submitter(rsp)) |
|
mocker.patch('pappyproxy.http.Request.async_deep_save').return_value = mock_deferred() |
|
|
|
comm_data = {"action": "submit"} |
|
comm_data['host'] = http_request.host |
|
comm_data['port'] = http_request.port |
|
comm_data['is_ssl'] = http_request.is_ssl |
|
comm_data['full_message'] = base64.b64encode(http_request.full_message) |
|
comm_data['tags'] = ['footag'] |
|
v = perform_comm(json.dumps(comm_data)) |
|
|
|
expected_data = {} |
|
expected_data[u'request'] = json.loads(http_request.to_json()) |
|
expected_data[u'response'] = json.loads(http_request.response.to_json()) |
|
expected_data[u'success'] = True |
|
expected_data[u'request'][u'tags'] = [u'footag'] |
|
assert json.loads(v) == expected_data |
|
|
|
def test_submit_request_fail(mocker, http_request): |
|
mocker.patch.object(pappyproxy.http.Request, 'submit_request', new=mock_loader_fail()) |
|
mocker.patch('pappyproxy.http.Request.async_deep_save').return_value = mock_deferred() |
|
|
|
comm_data = {"action": "submit"} |
|
comm_data['full_message'] = base64.b64encode('HELLO THIS IS REQUEST\r\nWHAT IS HEADER FORMAT\r\n') |
|
v = json.loads(perform_comm(json.dumps(comm_data))) |
|
print v |
|
|
|
assert v['success'] == False |
|
assert 'message' in v |
|
|
|
|