You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
211 lines
7.5 KiB
211 lines
7.5 KiB
import pytest |
|
import mock |
|
import pappyproxy |
|
|
|
from pappyproxy.mangle import async_mangle_request, async_mangle_response |
|
from pappyproxy.http import Request, Response |
|
from testutil import no_tcp, no_database, func_deleted, mock_deferred, mock_deep_save, fake_saving |
|
|
|
def retf(r): |
|
return False |
|
|
|
@pytest.fixture |
|
def ignore_edit(mocker): |
|
new_edit = mock.MagicMock() |
|
new_edit.return_value = mock_deferred(None) |
|
new_plugin = mock.MagicMock() |
|
new_plugin.return_value = new_edit |
|
mocker.patch('pappyproxy.plugin.plugin_by_name', new=new_plugin) |
|
|
|
@pytest.fixture |
|
def ignore_delete(mocker): |
|
new_os_remove = mock.MagicMock() |
|
mocker.patch('os.remove', new=new_os_remove) |
|
return new_os_remove |
|
|
|
@pytest.fixture(autouse=True) |
|
def no_logging(mocker): |
|
mocker.patch('pappyproxy.proxy.log') |
|
|
|
@pytest.fixture |
|
def req(): |
|
r = Request() |
|
r.start_line = 'GET / HTTP/1.1' |
|
r.host = 'www.ffffff.eeeeee' |
|
r.body = 'AAAA' |
|
return r |
|
|
|
@pytest.fixture |
|
def req_w_rsp(req): |
|
r = Response() |
|
r.start_line = 'HTTP/1.1 200 OK' |
|
r.headers['Test-Header'] = 'ABC123' |
|
r.body = 'AAAA' |
|
req.response = r |
|
return req |
|
|
|
@pytest.fixture |
|
def mock_tempfile(mocker): |
|
new_tfile_obj = mock.MagicMock() |
|
tfile_instance = mock.MagicMock() |
|
new_tfile_obj.return_value.__enter__.return_value = tfile_instance |
|
|
|
tfile_instance.name = 'mockTemporaryFile' |
|
mocker.patch('tempfile.NamedTemporaryFile', new=new_tfile_obj) |
|
|
|
new_open = mock.MagicMock() |
|
fake_file = mock.MagicMock(spec=file) |
|
new_open.return_value.__enter__.return_value = fake_file |
|
mocker.patch('__builtin__.open', new_open) |
|
|
|
return (new_tfile_obj, tfile_instance, new_open, fake_file) |
|
|
|
|
|
######################## |
|
## Test request mangling |
|
|
|
@pytest.inlineCallbacks |
|
def test_mangle_request_edit(req, mock_deep_save, mock_tempfile, |
|
ignore_edit, ignore_delete): |
|
tfile_obj, tfile_instance, new_open, fake_file = mock_tempfile |
|
r = req |
|
new_contents = ('GET / HTTP/1.1\r\n' |
|
'Content-Length: 4\r\n\r\n' |
|
'BBBB') |
|
fake_file.read.return_value = new_contents |
|
new_req = yield async_mangle_request(r) |
|
assert not mock_deep_save.called |
|
assert tfile_obj.called |
|
assert tfile_instance.write.called |
|
assert tfile_instance.write.call_args == ((r.full_request,),) |
|
assert new_open.called |
|
assert fake_file.read.called |
|
|
|
assert new_req.full_request == new_contents |
|
|
|
@pytest.inlineCallbacks |
|
def test_mangle_request_edit_newlines(req, mock_deep_save, mock_tempfile, |
|
ignore_edit, ignore_delete): |
|
# Intercepting is off, request in scope |
|
tfile_obj, tfile_instance, new_open, fake_file = mock_tempfile |
|
r = req |
|
new_contents = ('GET / HTTP/1.1\r\n' |
|
'Test-Head: FOOBIE\n' |
|
'Content-Length: 4\n\r\n' |
|
'BBBB') |
|
fake_file.read.return_value = new_contents |
|
new_req = yield async_mangle_request(r) |
|
|
|
assert new_req.full_request == ('GET / HTTP/1.1\r\n' |
|
'Test-Head: FOOBIE\r\n' |
|
'Content-Length: 4\r\n\r\n' |
|
'BBBB') |
|
assert new_req.headers['Test-Head'] == 'FOOBIE' |
|
|
|
@pytest.inlineCallbacks |
|
def test_mangle_request_drop(req, mock_deep_save, mock_tempfile, |
|
ignore_edit, ignore_delete): |
|
# Intercepting is off, request in scope |
|
tfile_obj, tfile_instance, new_open, fake_file = mock_tempfile |
|
r = req |
|
new_contents = '' |
|
fake_file.read.return_value = new_contents |
|
new_req = yield async_mangle_request(r) |
|
|
|
assert new_req is None |
|
|
|
@pytest.inlineCallbacks |
|
def test_mangle_request_edit_len(req, mock_deep_save, mock_tempfile, |
|
ignore_edit, ignore_delete): |
|
# Intercepting is off, request in scope |
|
tfile_obj, tfile_instance, new_open, fake_file = mock_tempfile |
|
r = req |
|
new_contents = ('GET / HTTP/1.1\r\n' |
|
'Test-Head: FOOBIE\n' |
|
'Content-Length: 4\n\r\n' |
|
'BBBBAAAA') |
|
fake_file.read.return_value = new_contents |
|
new_req = yield async_mangle_request(r) |
|
|
|
assert new_req.full_request == ('GET / HTTP/1.1\r\n' |
|
'Test-Head: FOOBIE\r\n' |
|
'Content-Length: 8\r\n\r\n' |
|
'BBBBAAAA') |
|
|
|
|
|
######################### |
|
## Test response mangling |
|
|
|
@pytest.inlineCallbacks |
|
def test_mangle_response_edit(req_w_rsp, mock_deep_save, mock_tempfile, |
|
ignore_edit, ignore_delete): |
|
# Intercepting is on, edit |
|
tfile_obj, tfile_instance, new_open, fake_file = mock_tempfile |
|
r = req_w_rsp |
|
old_rsp = r.response.full_response |
|
new_contents = ('HTTP/1.1 403 NOTOKIEDOKIE\r\n' |
|
'Content-Length: 4\r\n' |
|
'Other-Header: foobles\r\n\r\n' |
|
'BBBB') |
|
fake_file.read.return_value = new_contents |
|
mangled_rsp = yield async_mangle_response(r) |
|
assert not mock_deep_save.called |
|
assert tfile_obj.called |
|
assert tfile_instance.write.called |
|
assert tfile_instance.write.call_args == ((old_rsp,),) |
|
assert new_open.called |
|
assert fake_file.read.called |
|
|
|
assert mangled_rsp.full_response == new_contents |
|
|
|
@pytest.inlineCallbacks |
|
def test_mangle_response_newlines(req_w_rsp, mock_deep_save, mock_tempfile, |
|
ignore_edit, ignore_delete): |
|
# Intercepting is off, request in scope |
|
tfile_obj, tfile_instance, new_open, fake_file = mock_tempfile |
|
r = req_w_rsp |
|
old_rsp = r.response.full_response |
|
new_contents = ('HTTP/1.1 403 NOTOKIEDOKIE\n' |
|
'Content-Length: 4\n' |
|
'Other-Header: foobles\r\n\n' |
|
'BBBB') |
|
fake_file.read.return_value = new_contents |
|
mangled_rsp = yield async_mangle_response(r) |
|
|
|
assert mangled_rsp.full_response == ('HTTP/1.1 403 NOTOKIEDOKIE\r\n' |
|
'Content-Length: 4\r\n' |
|
'Other-Header: foobles\r\n\r\n' |
|
'BBBB') |
|
assert mangled_rsp.headers['Other-Header'] == 'foobles' |
|
|
|
@pytest.inlineCallbacks |
|
def test_mangle_response_drop(req_w_rsp, mock_deep_save, mock_tempfile, |
|
ignore_edit, ignore_delete): |
|
# Intercepting is off, request in scope |
|
tfile_obj, tfile_instance, new_open, fake_file = mock_tempfile |
|
r = req_w_rsp |
|
old_rsp = r.response.full_response |
|
new_contents = '' |
|
fake_file.read.return_value = new_contents |
|
mangled_rsp = yield async_mangle_response(r) |
|
|
|
assert mangled_rsp is None |
|
|
|
@pytest.inlineCallbacks |
|
def test_mangle_response_new_len(req_w_rsp, mock_deep_save, mock_tempfile, |
|
ignore_edit, ignore_delete): |
|
# Intercepting is off, request in scope |
|
tfile_obj, tfile_instance, new_open, fake_file = mock_tempfile |
|
r = req_w_rsp |
|
old_rsp = r.response.full_response |
|
new_contents = ('HTTP/1.1 403 NOTOKIEDOKIE\n' |
|
'Content-Length: 4\n' |
|
'Other-Header: foobles\r\n\n' |
|
'BBBBAAAA') |
|
fake_file.read.return_value = new_contents |
|
mangled_rsp = yield async_mangle_response(r) |
|
|
|
assert mangled_rsp.full_response == ('HTTP/1.1 403 NOTOKIEDOKIE\r\n' |
|
'Content-Length: 8\r\n' |
|
'Other-Header: foobles\r\n\r\n' |
|
'BBBBAAAA')
|
|
|