A fork of pappy proxy
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

124 lines
4.4 KiB

import base64
import pytest
import mock
import json
import datetime
import pappyproxy
from pappyproxy.util import PappyException
from pappyproxy.comm import CommServer
from pappyproxy.http import Request, Response
from testutil import mock_deferred, func_deleted, TLSStringTransport, freeze, mock_int_macro, no_tcp
@pytest.fixture(autouse=True)
def no_int_macros(mocker):
mocker.patch('pappyproxy.plugin.active_intercepting_macros').return_value = {}
@pytest.fixture
def http_request():
req = Request('GET / HTTP/1.1\r\n\r\n')
req.host = 'www.foo.faketld'
req.port = '1337'
req.is_ssl = True
req.reqid = 123
rsp = Response('HTTP/1.1 200 OK\r\n\r\n')
req.response = rsp
return req
def perform_comm(line):
serv = CommServer()
serv.transport = TLSStringTransport()
serv.lineReceived(line)
n = datetime.datetime.now()
while serv.transport.value() == '':
t = datetime.datetime.now()
if (t-n).total_seconds() > 5:
raise Exception("Request timed out")
return serv.transport.value()
def test_simple():
v = perform_comm('{"action": "ping"}')
assert json.loads(v) == {'ping': 'pong', 'success': True}
def mock_loader(rsp):
def f(*args, **kwargs):
return rsp
return classmethod(f)
def mock_submitter(rsp):
def f(_, req, *args, **kwargs):
req.response = rsp
req.reqid = 123
return mock_deferred(req)
return classmethod(f)
def mock_loader_fail():
def f(*args, **kwargs):
raise PappyException("lololo message don't exist dawg")
return classmethod(f)
def test_get_request(mocker, http_request):
mocker.patch.object(pappyproxy.http.Request, 'load_request', new=mock_loader(http_request))
v = perform_comm('{"action": "get_request", "reqid": "1"}')
expected_data = json.loads(http_request.to_json())
expected_data['success'] = True
assert json.loads(v) == expected_data
def test_get_request_fail(mocker, http_request):
mocker.patch.object(pappyproxy.http.Request, 'load_request', new=mock_loader_fail())
v = json.loads(perform_comm('{"action": "get_request", "reqid": "1"}'))
assert v['success'] == False
assert 'message' in v
def test_get_response(mocker, http_request):
mocker.patch.object(pappyproxy.http.Request, 'load_request', new=mock_loader(http_request))
mocker.patch.object(pappyproxy.http.Response, 'load_response', new=mock_loader(http_request.response))
v = perform_comm('{"action": "get_response", "reqid": "1"}')
expected_data = json.loads(http_request.response.to_json())
expected_data['success'] = True
assert json.loads(v) == expected_data
def test_get_response_fail(mocker, http_request):
mocker.patch.object(pappyproxy.http.Request, 'load_request', new=mock_loader(http_request))
mocker.patch.object(pappyproxy.http.Response, 'load_response', new=mock_loader_fail())
v = json.loads(perform_comm('{"action": "get_response", "reqid": "1"}'))
assert v['success'] == False
assert 'message' in v
def test_submit_request(mocker, http_request):
rsp = Response('HTTP/1.1 200 OK\r\n\r\n')
mocker.patch.object(pappyproxy.http.Request, 'submit_request', new=mock_submitter(rsp))
mocker.patch('pappyproxy.http.Request.async_deep_save').return_value = mock_deferred()
comm_data = {"action": "submit"}
comm_data['host'] = http_request.host
comm_data['port'] = http_request.port
comm_data['is_ssl'] = http_request.is_ssl
comm_data['full_message'] = base64.b64encode(http_request.full_message)
comm_data['tags'] = ['footag']
v = perform_comm(json.dumps(comm_data))
expected_data = {}
expected_data[u'request'] = json.loads(http_request.to_json())
expected_data[u'response'] = json.loads(http_request.response.to_json())
expected_data[u'success'] = True
expected_data[u'request'][u'tags'] = [u'footag']
assert json.loads(v) == expected_data
def test_submit_request_fail(mocker, http_request):
mocker.patch.object(pappyproxy.http.Request, 'submit_request', new=mock_loader_fail())
mocker.patch('pappyproxy.http.Request.async_deep_save').return_value = mock_deferred()
comm_data = {"action": "submit"}
comm_data['full_message'] = base64.b64encode('HELLO THIS IS REQUEST\r\nWHAT IS HEADER FORMAT\r\n')
v = json.loads(perform_comm(json.dumps(comm_data)))
print v
assert v['success'] == False
assert 'message' in v