Version 0.2.0
This commit is contained in:
parent
312b985229
commit
26376eaaec
43 changed files with 4699 additions and 2668 deletions
211
pappyproxy/tests/old_test_mangle.py
Normal file
211
pappyproxy/tests/old_test_mangle.py
Normal file
|
@ -0,0 +1,211 @@
|
|||
import pytest
|
||||
import mock
|
||||
import pappyproxy
|
||||
|
||||
from pappyproxy.mangle import async_mangle_request, async_mangle_response
|
||||
from pappyproxy.http import Request, Response
|
||||
from testutil import no_tcp, no_database, func_deleted, mock_deferred, mock_deep_save, fake_saving
|
||||
|
||||
def retf(r):
|
||||
return False
|
||||
|
||||
@pytest.fixture
|
||||
def ignore_edit(mocker):
|
||||
new_edit = mock.MagicMock()
|
||||
new_edit.return_value = mock_deferred(None)
|
||||
new_plugin = mock.MagicMock()
|
||||
new_plugin.return_value = new_edit
|
||||
mocker.patch('pappyproxy.plugin.plugin_by_name', new=new_plugin)
|
||||
|
||||
@pytest.fixture
|
||||
def ignore_delete(mocker):
|
||||
new_os_remove = mock.MagicMock()
|
||||
mocker.patch('os.remove', new=new_os_remove)
|
||||
return new_os_remove
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def no_logging(mocker):
|
||||
mocker.patch('pappyproxy.proxy.log')
|
||||
|
||||
@pytest.fixture
|
||||
def req():
|
||||
r = Request()
|
||||
r.start_line = 'GET / HTTP/1.1'
|
||||
r.host = 'www.ffffff.eeeeee'
|
||||
r.body = 'AAAA'
|
||||
return r
|
||||
|
||||
@pytest.fixture
|
||||
def req_w_rsp(req):
|
||||
r = Response()
|
||||
r.start_line = 'HTTP/1.1 200 OK'
|
||||
r.headers['Test-Header'] = 'ABC123'
|
||||
r.body = 'AAAA'
|
||||
req.response = r
|
||||
return req
|
||||
|
||||
@pytest.fixture
|
||||
def mock_tempfile(mocker):
|
||||
new_tfile_obj = mock.MagicMock()
|
||||
tfile_instance = mock.MagicMock()
|
||||
new_tfile_obj.return_value.__enter__.return_value = tfile_instance
|
||||
|
||||
tfile_instance.name = 'mockTemporaryFile'
|
||||
mocker.patch('tempfile.NamedTemporaryFile', new=new_tfile_obj)
|
||||
|
||||
new_open = mock.MagicMock()
|
||||
fake_file = mock.MagicMock(spec=file)
|
||||
new_open.return_value.__enter__.return_value = fake_file
|
||||
mocker.patch('__builtin__.open', new_open)
|
||||
|
||||
return (new_tfile_obj, tfile_instance, new_open, fake_file)
|
||||
|
||||
|
||||
########################
|
||||
## Test request mangling
|
||||
|
||||
@pytest.inlineCallbacks
|
||||
def test_mangle_request_edit(req, mock_deep_save, mock_tempfile,
|
||||
ignore_edit, ignore_delete):
|
||||
tfile_obj, tfile_instance, new_open, fake_file = mock_tempfile
|
||||
r = req
|
||||
new_contents = ('GET / HTTP/1.1\r\n'
|
||||
'Content-Length: 4\r\n\r\n'
|
||||
'BBBB')
|
||||
fake_file.read.return_value = new_contents
|
||||
new_req = yield async_mangle_request(r)
|
||||
assert not mock_deep_save.called
|
||||
assert tfile_obj.called
|
||||
assert tfile_instance.write.called
|
||||
assert tfile_instance.write.call_args == ((r.full_request,),)
|
||||
assert new_open.called
|
||||
assert fake_file.read.called
|
||||
|
||||
assert new_req.full_request == new_contents
|
||||
|
||||
@pytest.inlineCallbacks
|
||||
def test_mangle_request_edit_newlines(req, mock_deep_save, mock_tempfile,
|
||||
ignore_edit, ignore_delete):
|
||||
# Intercepting is off, request in scope
|
||||
tfile_obj, tfile_instance, new_open, fake_file = mock_tempfile
|
||||
r = req
|
||||
new_contents = ('GET / HTTP/1.1\r\n'
|
||||
'Test-Head: FOOBIE\n'
|
||||
'Content-Length: 4\n\r\n'
|
||||
'BBBB')
|
||||
fake_file.read.return_value = new_contents
|
||||
new_req = yield async_mangle_request(r)
|
||||
|
||||
assert new_req.full_request == ('GET / HTTP/1.1\r\n'
|
||||
'Test-Head: FOOBIE\r\n'
|
||||
'Content-Length: 4\r\n\r\n'
|
||||
'BBBB')
|
||||
assert new_req.headers['Test-Head'] == 'FOOBIE'
|
||||
|
||||
@pytest.inlineCallbacks
|
||||
def test_mangle_request_drop(req, mock_deep_save, mock_tempfile,
|
||||
ignore_edit, ignore_delete):
|
||||
# Intercepting is off, request in scope
|
||||
tfile_obj, tfile_instance, new_open, fake_file = mock_tempfile
|
||||
r = req
|
||||
new_contents = ''
|
||||
fake_file.read.return_value = new_contents
|
||||
new_req = yield async_mangle_request(r)
|
||||
|
||||
assert new_req is None
|
||||
|
||||
@pytest.inlineCallbacks
|
||||
def test_mangle_request_edit_len(req, mock_deep_save, mock_tempfile,
|
||||
ignore_edit, ignore_delete):
|
||||
# Intercepting is off, request in scope
|
||||
tfile_obj, tfile_instance, new_open, fake_file = mock_tempfile
|
||||
r = req
|
||||
new_contents = ('GET / HTTP/1.1\r\n'
|
||||
'Test-Head: FOOBIE\n'
|
||||
'Content-Length: 4\n\r\n'
|
||||
'BBBBAAAA')
|
||||
fake_file.read.return_value = new_contents
|
||||
new_req = yield async_mangle_request(r)
|
||||
|
||||
assert new_req.full_request == ('GET / HTTP/1.1\r\n'
|
||||
'Test-Head: FOOBIE\r\n'
|
||||
'Content-Length: 8\r\n\r\n'
|
||||
'BBBBAAAA')
|
||||
|
||||
|
||||
#########################
|
||||
## Test response mangling
|
||||
|
||||
@pytest.inlineCallbacks
|
||||
def test_mangle_response_edit(req_w_rsp, mock_deep_save, mock_tempfile,
|
||||
ignore_edit, ignore_delete):
|
||||
# Intercepting is on, edit
|
||||
tfile_obj, tfile_instance, new_open, fake_file = mock_tempfile
|
||||
r = req_w_rsp
|
||||
old_rsp = r.response.full_response
|
||||
new_contents = ('HTTP/1.1 403 NOTOKIEDOKIE\r\n'
|
||||
'Content-Length: 4\r\n'
|
||||
'Other-Header: foobles\r\n\r\n'
|
||||
'BBBB')
|
||||
fake_file.read.return_value = new_contents
|
||||
mangled_rsp = yield async_mangle_response(r)
|
||||
assert not mock_deep_save.called
|
||||
assert tfile_obj.called
|
||||
assert tfile_instance.write.called
|
||||
assert tfile_instance.write.call_args == ((old_rsp,),)
|
||||
assert new_open.called
|
||||
assert fake_file.read.called
|
||||
|
||||
assert mangled_rsp.full_response == new_contents
|
||||
|
||||
@pytest.inlineCallbacks
|
||||
def test_mangle_response_newlines(req_w_rsp, mock_deep_save, mock_tempfile,
|
||||
ignore_edit, ignore_delete):
|
||||
# Intercepting is off, request in scope
|
||||
tfile_obj, tfile_instance, new_open, fake_file = mock_tempfile
|
||||
r = req_w_rsp
|
||||
old_rsp = r.response.full_response
|
||||
new_contents = ('HTTP/1.1 403 NOTOKIEDOKIE\n'
|
||||
'Content-Length: 4\n'
|
||||
'Other-Header: foobles\r\n\n'
|
||||
'BBBB')
|
||||
fake_file.read.return_value = new_contents
|
||||
mangled_rsp = yield async_mangle_response(r)
|
||||
|
||||
assert mangled_rsp.full_response == ('HTTP/1.1 403 NOTOKIEDOKIE\r\n'
|
||||
'Content-Length: 4\r\n'
|
||||
'Other-Header: foobles\r\n\r\n'
|
||||
'BBBB')
|
||||
assert mangled_rsp.headers['Other-Header'] == 'foobles'
|
||||
|
||||
@pytest.inlineCallbacks
|
||||
def test_mangle_response_drop(req_w_rsp, mock_deep_save, mock_tempfile,
|
||||
ignore_edit, ignore_delete):
|
||||
# Intercepting is off, request in scope
|
||||
tfile_obj, tfile_instance, new_open, fake_file = mock_tempfile
|
||||
r = req_w_rsp
|
||||
old_rsp = r.response.full_response
|
||||
new_contents = ''
|
||||
fake_file.read.return_value = new_contents
|
||||
mangled_rsp = yield async_mangle_response(r)
|
||||
|
||||
assert mangled_rsp is None
|
||||
|
||||
@pytest.inlineCallbacks
|
||||
def test_mangle_response_new_len(req_w_rsp, mock_deep_save, mock_tempfile,
|
||||
ignore_edit, ignore_delete):
|
||||
# Intercepting is off, request in scope
|
||||
tfile_obj, tfile_instance, new_open, fake_file = mock_tempfile
|
||||
r = req_w_rsp
|
||||
old_rsp = r.response.full_response
|
||||
new_contents = ('HTTP/1.1 403 NOTOKIEDOKIE\n'
|
||||
'Content-Length: 4\n'
|
||||
'Other-Header: foobles\r\n\n'
|
||||
'BBBBAAAA')
|
||||
fake_file.read.return_value = new_contents
|
||||
mangled_rsp = yield async_mangle_response(r)
|
||||
|
||||
assert mangled_rsp.full_response == ('HTTP/1.1 403 NOTOKIEDOKIE\r\n'
|
||||
'Content-Length: 8\r\n'
|
||||
'Other-Header: foobles\r\n\r\n'
|
||||
'BBBBAAAA')
|
|
@ -11,8 +11,8 @@ def test_filter_reqs():
|
|||
pass
|
||||
|
||||
def test_gen_filter_by_all_request():
|
||||
f = context.gen_filter_by_all(context.cmp_contains, 'hello')
|
||||
fn = context.gen_filter_by_all(context.cmp_contains, 'hello', negate=True)
|
||||
f = context.gen_filter_by_all(['ct', 'hello'])
|
||||
fn = context.gen_filter_by_all(['nct', 'hello'])
|
||||
|
||||
# Nowhere
|
||||
r = Request('GET / HTTP/1.1\r\n')
|
||||
|
@ -31,7 +31,7 @@ def test_gen_filter_by_all_request():
|
|||
|
||||
# Data
|
||||
r = Request('GET / HTTP/1.1\r\n')
|
||||
r.raw_data = 'hello'
|
||||
r.body = 'hello'
|
||||
assert f(r)
|
||||
assert not fn(r)
|
||||
|
||||
|
@ -73,8 +73,8 @@ def test_gen_filter_by_all_request():
|
|||
|
||||
|
||||
def test_gen_filter_by_all_response(http_request):
|
||||
f = context.gen_filter_by_all(context.cmp_contains, 'hello')
|
||||
fn = context.gen_filter_by_all(context.cmp_contains, 'hello', negate=True)
|
||||
f = context.gen_filter_by_all(['ct', 'hello'])
|
||||
fn = context.gen_filter_by_all(['nct', 'hello'])
|
||||
|
||||
# Nowhere
|
||||
r = Response('HTTP/1.1 200 OK\r\n')
|
||||
|
@ -91,7 +91,7 @@ def test_gen_filter_by_all_response(http_request):
|
|||
# Data
|
||||
r = Response('HTTP/1.1 200 OK\r\n')
|
||||
http_request.response = r
|
||||
r.raw_data = 'hello'
|
||||
r.body = 'hello'
|
||||
assert f(http_request)
|
||||
assert not fn(http_request)
|
||||
|
||||
|
@ -138,8 +138,8 @@ def test_gen_filter_by_all_response(http_request):
|
|||
assert fn(http_request)
|
||||
|
||||
def test_filter_by_host(http_request):
|
||||
f = context.gen_filter_by_host(context.cmp_contains, 'sexy')
|
||||
fn = context.gen_filter_by_host(context.cmp_contains, 'sexy', negate=True)
|
||||
f = context.gen_filter_by_host(['ct', 'sexy'])
|
||||
fn = context.gen_filter_by_host(['nct', 'sexy'])
|
||||
|
||||
http_request.headers['Host'] = 'google.com'
|
||||
http_request.headers['MiscHeader'] = 'vim.sexy'
|
||||
|
@ -152,55 +152,55 @@ def test_filter_by_host(http_request):
|
|||
assert not fn(http_request)
|
||||
|
||||
def test_filter_by_body():
|
||||
f = context.gen_filter_by_body(context.cmp_contains, 'sexy')
|
||||
fn = context.gen_filter_by_body(context.cmp_contains, 'sexy', negate=True)
|
||||
f = context.gen_filter_by_body(['ct', 'sexy'])
|
||||
fn = context.gen_filter_by_body(['nct', 'sexy'])
|
||||
|
||||
# Test request bodies
|
||||
r = Request()
|
||||
r.status_line = 'GET /sexy HTTP/1.1'
|
||||
r.start_line = 'GET /sexy HTTP/1.1'
|
||||
r.headers['Header'] = 'sexy'
|
||||
r.raw_data = 'foo'
|
||||
r.body = 'foo'
|
||||
assert not f(r)
|
||||
assert fn(r)
|
||||
|
||||
r.raw_data = 'sexy'
|
||||
r.body = 'sexy'
|
||||
assert f(r)
|
||||
assert not fn(r)
|
||||
|
||||
# Test response bodies
|
||||
r = Request()
|
||||
rsp = Response()
|
||||
rsp.status_line = 'HTTP/1.1 200 OK'
|
||||
rsp.start_line = 'HTTP/1.1 200 OK'
|
||||
rsp.headers['sexy'] = 'sexy'
|
||||
r.status_line = 'GET /sexy HTTP/1.1'
|
||||
r.start_line = 'GET /sexy HTTP/1.1'
|
||||
r.headers['Header'] = 'sexy'
|
||||
r.response = rsp
|
||||
assert not f(r)
|
||||
assert fn(r)
|
||||
|
||||
rsp.raw_data = 'sexy'
|
||||
rsp.body = 'sexy'
|
||||
assert f(r)
|
||||
assert not fn(r)
|
||||
|
||||
def test_filter_by_response_code(http_request):
|
||||
f = context.gen_filter_by_response_code(context.cmp_eq, 200)
|
||||
fn = context.gen_filter_by_response_code(context.cmp_eq, 200, negate=True)
|
||||
f = context.gen_filter_by_response_code(['eq', '200'])
|
||||
fn = context.gen_filter_by_response_code(['neq', '200'])
|
||||
|
||||
r = Response()
|
||||
http_request.response = r
|
||||
r.status_line = 'HTTP/1.1 404 Not Found'
|
||||
r.start_line = 'HTTP/1.1 404 Not Found'
|
||||
assert not f(http_request)
|
||||
assert fn(http_request)
|
||||
|
||||
r.status_line = 'HTTP/1.1 200 OK'
|
||||
r.start_line = 'HTTP/1.1 200 OK'
|
||||
assert f(http_request)
|
||||
assert not fn(http_request)
|
||||
|
||||
def test_filter_by_raw_headers_request():
|
||||
f1 = context.gen_filter_by_raw_headers(context.cmp_contains, 'Sexy:')
|
||||
fn1 = context.gen_filter_by_raw_headers(context.cmp_contains, 'Sexy:', negate=True)
|
||||
f2 = context.gen_filter_by_raw_headers(context.cmp_contains, 'sexy\r\nHeader')
|
||||
fn2 = context.gen_filter_by_raw_headers(context.cmp_contains, 'sexy\r\nHeader', negate=True)
|
||||
f1 = context.gen_filter_by_raw_headers(['ct', 'Sexy:'])
|
||||
fn1 = context.gen_filter_by_raw_headers(['nct', 'Sexy:'])
|
||||
f2 = context.gen_filter_by_raw_headers(['ct', 'sexy\r\nHeader'])
|
||||
fn2 = context.gen_filter_by_raw_headers(['nct', 'sexy\r\nHeader'])
|
||||
|
||||
r = Request('GET / HTTP/1.1\r\n')
|
||||
rsp = Response('HTTP/1.1 200 OK\r\n')
|
||||
|
@ -228,10 +228,10 @@ def test_filter_by_raw_headers_request():
|
|||
assert not fn2(r)
|
||||
|
||||
def test_filter_by_raw_headers_response():
|
||||
f1 = context.gen_filter_by_raw_headers(context.cmp_contains, 'Sexy:')
|
||||
fn1 = context.gen_filter_by_raw_headers(context.cmp_contains, 'Sexy:', negate=True)
|
||||
f2 = context.gen_filter_by_raw_headers(context.cmp_contains, 'sexy\r\nHeader')
|
||||
fn2 = context.gen_filter_by_raw_headers(context.cmp_contains, 'sexy\r\nHeader', negate=True)
|
||||
f1 = context.gen_filter_by_raw_headers(['ct', 'Sexy:'])
|
||||
fn1 = context.gen_filter_by_raw_headers(['nct', 'Sexy:'])
|
||||
f2 = context.gen_filter_by_raw_headers(['ct', 'sexy\r\nHeader'])
|
||||
fn2 = context.gen_filter_by_raw_headers(['nct', 'sexy\r\nHeader'])
|
||||
|
||||
r = Request('GET / HTTP/1.1\r\n')
|
||||
rsp = Response('HTTP/1.1 200 OK\r\n')
|
||||
|
@ -259,25 +259,24 @@ def test_filter_by_raw_headers_response():
|
|||
assert not fn2(r)
|
||||
|
||||
def test_filter_by_path(http_request):
|
||||
f = context.gen_filter_by_path(context.cmp_contains, 'porn') # find the fun websites
|
||||
fn = context.gen_filter_by_path(context.cmp_contains, 'porn', negate=True) # find the boring websites
|
||||
f = context.gen_filter_by_path(['ct', 'porn']) # find the fun websites
|
||||
fn = context.gen_filter_by_path(['nct', 'porn']) # find the boring websites
|
||||
|
||||
http_request.status_line = 'GET / HTTP/1.1'
|
||||
http_request.start_line = 'GET / HTTP/1.1'
|
||||
assert not f(http_request)
|
||||
assert fn(http_request)
|
||||
|
||||
http_request.status_line = 'GET /path/to/great/porn HTTP/1.1'
|
||||
http_request.start_line = 'GET /path/to/great/porn HTTP/1.1'
|
||||
assert f(http_request)
|
||||
assert not fn(http_request)
|
||||
|
||||
http_request.status_line = 'GET /path/to/porn/great HTTP/1.1'
|
||||
http_request.start_line = 'GET /path/to/porn/great HTTP/1.1'
|
||||
assert f(http_request)
|
||||
assert not fn(http_request)
|
||||
|
||||
def test_gen_filter_by_submitted_cookies():
|
||||
f1 = context.gen_filter_by_submitted_cookies(context.cmp_contains, 'Session')
|
||||
f2 = context.gen_filter_by_submitted_cookies(context.cmp_contains, 'Cookie',
|
||||
context.cmp_contains, 'CookieVal')
|
||||
f1 = context.gen_filter_by_submitted_cookies(['ct', 'Session'])
|
||||
f2 = context.gen_filter_by_submitted_cookies(['ct', 'Cookie', 'nct', 'CookieVal'])
|
||||
r = Request(('GET / HTTP/1.1\r\n'
|
||||
'Cookie: foo=bar\r\n'
|
||||
'\r\n'))
|
||||
|
@ -294,18 +293,17 @@ def test_gen_filter_by_submitted_cookies():
|
|||
'Cookie: Session=bar; CookieThing=NoMatch\r\n'
|
||||
'\r\n'))
|
||||
assert f1(r)
|
||||
assert not f2(r)
|
||||
assert f2(r)
|
||||
|
||||
r = Request(('GET / HTTP/1.1\r\n'
|
||||
'Cookie: Session=bar; CookieThing=CookieValue\r\n'
|
||||
'\r\n'))
|
||||
assert f1(r)
|
||||
assert f2(r)
|
||||
assert not f2(r)
|
||||
|
||||
def test_gen_filter_by_set_cookies():
|
||||
f1 = context.gen_filter_by_set_cookies(context.cmp_contains, 'Session')
|
||||
f2 = context.gen_filter_by_set_cookies(context.cmp_contains, 'Cookie',
|
||||
context.cmp_contains, 'CookieVal')
|
||||
f1 = context.gen_filter_by_set_cookies(['ct', 'Session'])
|
||||
f2 = context.gen_filter_by_set_cookies(['ct', 'Cookie', 'ct', 'CookieVal'])
|
||||
|
||||
r = Request('GET / HTTP/1.1\r\n\r\n')
|
||||
rsp = Response(('HTTP/1.1 200 OK\r\n'
|
||||
|
@ -345,9 +343,8 @@ def test_gen_filter_by_set_cookies():
|
|||
assert f2(r)
|
||||
|
||||
def test_filter_by_params_get():
|
||||
f1 = context.gen_filter_by_params(context.cmp_contains, 'Session')
|
||||
f2 = context.gen_filter_by_params(context.cmp_contains, 'Cookie',
|
||||
context.cmp_contains, 'CookieVal')
|
||||
f1 = context.gen_filter_by_params(['ct', 'Session'])
|
||||
f2 = context.gen_filter_by_params(['ct', 'Cookie', 'ct', 'CookieVal'])
|
||||
|
||||
r = Request('GET / HTTP/1.1\r\n\r\n')
|
||||
assert not f1(r)
|
||||
|
@ -366,30 +363,29 @@ def test_filter_by_params_get():
|
|||
assert f2(r)
|
||||
|
||||
def test_filter_by_params_post():
|
||||
f1 = context.gen_filter_by_params(context.cmp_contains, 'Session')
|
||||
f2 = context.gen_filter_by_params(context.cmp_contains, 'Cookie',
|
||||
context.cmp_contains, 'CookieVal')
|
||||
f1 = context.gen_filter_by_params(['ct', 'Session'])
|
||||
f2 = context.gen_filter_by_params(['ct', 'Cookie', 'ct', 'CookieVal'])
|
||||
|
||||
r = Request(('GET / HTTP/1.1\r\n'
|
||||
'Content-Type: application/x-www-form-urlencoded\r\n\r\n'))
|
||||
r.raw_data = 'foo=bar'
|
||||
r.body = 'foo=bar'
|
||||
assert not f1(r)
|
||||
assert not f2(r)
|
||||
|
||||
r = Request(('GET / HTTP/1.1\r\n'
|
||||
'Content-Type: application/x-www-form-urlencoded\r\n\r\n'))
|
||||
r.raw_data = 'Session=bar'
|
||||
r.body = 'Session=bar'
|
||||
assert f1(r)
|
||||
assert not f2(r)
|
||||
|
||||
r = Request(('GET / HTTP/1.1\r\n'
|
||||
'Content-Type: application/x-www-form-urlencoded\r\n\r\n'))
|
||||
r.raw_data = 'Session=bar&Cookie=foo'
|
||||
r.body = 'Session=bar&Cookie=foo'
|
||||
assert f1(r)
|
||||
assert not f2(r)
|
||||
|
||||
r = Request(('GET / HTTP/1.1\r\n'
|
||||
'Content-Type: application/x-www-form-urlencoded\r\n\r\n'))
|
||||
r.raw_data = 'Session=bar&CookieThing=CookieValue'
|
||||
r.body = 'Session=bar&CookieThing=CookieValue'
|
||||
assert f1(r)
|
||||
assert f2(r)
|
||||
|
|
|
@ -86,7 +86,7 @@ def test_chunked_simple():
|
|||
full_data += '0\r\n\r\n'
|
||||
c.add_data(full_data)
|
||||
assert c.complete
|
||||
assert c.raw_data == 'A'*5
|
||||
assert c.body == 'A'*5
|
||||
|
||||
def test_chunked_hex():
|
||||
# Test hex lengths
|
||||
|
@ -97,7 +97,7 @@ def test_chunked_hex():
|
|||
full_data += '0\r\n\r\n'
|
||||
c.add_data(full_data)
|
||||
assert c.complete
|
||||
assert c.raw_data == 'A'*0xAF
|
||||
assert c.body == 'A'*0xAF
|
||||
|
||||
c = http.ChunkedData()
|
||||
full_data = 'AF\r\n'
|
||||
|
@ -106,7 +106,7 @@ def test_chunked_hex():
|
|||
full_data += '0\r\n\r\n'
|
||||
c.add_data(full_data)
|
||||
assert c.complete
|
||||
assert c.raw_data == 'A'*0xAF
|
||||
assert c.body == 'A'*0xAF
|
||||
|
||||
c = http.ChunkedData()
|
||||
full_data = 'aF\r\n'
|
||||
|
@ -115,7 +115,7 @@ def test_chunked_hex():
|
|||
full_data += '0\r\n\r\n'
|
||||
c.add_data(full_data)
|
||||
assert c.complete
|
||||
assert c.raw_data == 'A'*0xAF
|
||||
assert c.body == 'A'*0xAF
|
||||
|
||||
def test_chunked_leading_zeros():
|
||||
# Test leading zeros
|
||||
|
@ -126,7 +126,7 @@ def test_chunked_leading_zeros():
|
|||
full_data += '0\r\n\r\n'
|
||||
c.add_data(full_data)
|
||||
assert c.complete
|
||||
assert c.raw_data == 'A'*0xAF
|
||||
assert c.body == 'A'*0xAF
|
||||
|
||||
def test_chunked_one_char_add():
|
||||
# Test adding one character at a time
|
||||
|
@ -138,7 +138,7 @@ def test_chunked_one_char_add():
|
|||
for ch in full_data:
|
||||
c.add_data(ch)
|
||||
assert c.complete
|
||||
assert c.raw_data == 'A'*0xAF
|
||||
assert c.body == 'A'*0xAF
|
||||
|
||||
def test_chunked_incomplete():
|
||||
# Tests that complete isn't true until the data is received
|
||||
|
@ -168,11 +168,11 @@ def test_length_data_simple():
|
|||
assert not l.complete
|
||||
l.add_data('A'*100)
|
||||
assert l.complete
|
||||
assert l.raw_data == 'A'*100
|
||||
assert l.body == 'A'*100
|
||||
|
||||
l = http.LengthData(0)
|
||||
assert l.complete
|
||||
assert l.raw_data == ''
|
||||
assert l.body == ''
|
||||
|
||||
# Test incomplete
|
||||
l = http.LengthData(100)
|
||||
|
@ -185,7 +185,7 @@ def test_length_one_character():
|
|||
for i in range(100):
|
||||
l.add_data('A')
|
||||
assert l.complete
|
||||
assert l.raw_data == 'A'*100
|
||||
assert l.body == 'A'*100
|
||||
|
||||
# Test adding one character at a time (incomplete)
|
||||
l = http.LengthData(100)
|
||||
|
@ -198,7 +198,7 @@ def test_length_overflow():
|
|||
l = http.LengthData(100)
|
||||
l.add_data('A'*400)
|
||||
assert l.complete
|
||||
assert l.raw_data == 'A'*100
|
||||
assert l.body == 'A'*100
|
||||
|
||||
# Test throwing an exception when adding data after complete
|
||||
l = http.LengthData(100)
|
||||
|
@ -369,7 +369,80 @@ def test_response_cookie_blank():
|
|||
assert c.val == ''
|
||||
assert c.path == '/'
|
||||
assert c.secure
|
||||
|
||||
####################
|
||||
## HTTPMessage tests
|
||||
|
||||
def test_message_simple():
|
||||
raw = ('foobar\r\n'
|
||||
'a: b\r\n'
|
||||
'Content-Length: 100\r\n\r\n')
|
||||
raw += 'A'*100
|
||||
m = http.HTTPMessage(raw)
|
||||
assert m.complete
|
||||
assert m.malformed == False
|
||||
assert m.start_line == 'foobar'
|
||||
assert m.body == 'A'*100
|
||||
assert m.headers.all_pairs() == [('a', 'b'), ('Content-Length', '100')]
|
||||
assert m.headers['A'] == 'b'
|
||||
assert m.headers_section == ('foobar\r\n'
|
||||
'a: b\r\n'
|
||||
'Content-Length: 100\r\n\r\n')
|
||||
assert m.full_message == raw
|
||||
|
||||
def test_message_build():
|
||||
raw = ('foobar\r\n'
|
||||
'a: b\r\n'
|
||||
'Content-Length: 100\r\n\r\n')
|
||||
raw += 'A'*100
|
||||
m = http.HTTPMessage()
|
||||
m.add_line('foobar')
|
||||
m.add_line('a: b')
|
||||
m.add_line('Content-Length: 100')
|
||||
m.add_line('')
|
||||
assert not m.complete
|
||||
m.add_data('A'*50)
|
||||
assert not m.complete
|
||||
m.add_data('A'*50)
|
||||
assert m.complete
|
||||
assert m.malformed == False
|
||||
assert m.start_line == 'foobar'
|
||||
assert m.body == 'A'*100
|
||||
assert m.headers.all_pairs() == [('a', 'b'), ('Content-Length', '100')]
|
||||
assert m.headers['A'] == 'b'
|
||||
assert m.headers_section == ('foobar\r\n'
|
||||
'a: b\r\n'
|
||||
'Content-Length: 100\r\n\r\n')
|
||||
assert m.full_message == raw
|
||||
|
||||
def test_message_build_chunked():
|
||||
raw = ('foobar\r\n'
|
||||
'a: b\r\n'
|
||||
'Content-Length: 100\r\n\r\n')
|
||||
raw += 'A'*100
|
||||
m = http.HTTPMessage()
|
||||
m.add_line('foobar')
|
||||
m.add_line('a: b')
|
||||
m.add_line('Transfer-Encoding: chunked')
|
||||
m.add_line('')
|
||||
assert not m.complete
|
||||
m.add_data('%x\r\n' % 50)
|
||||
m.add_data('A'*50)
|
||||
m.add_data('\r\n')
|
||||
m.add_data('%x\r\n' % 50)
|
||||
m.add_data('A'*50)
|
||||
m.add_data('\r\n')
|
||||
m.add_data('0\r\n')
|
||||
assert m.complete
|
||||
assert m.malformed == False
|
||||
assert m.start_line == 'foobar'
|
||||
assert m.body == 'A'*100
|
||||
assert m.headers.all_pairs() == [('a', 'b'), ('Content-Length', '100')]
|
||||
assert m.headers['A'] == 'b'
|
||||
assert m.headers_section == ('foobar\r\n'
|
||||
'a: b\r\n'
|
||||
'Content-Length: 100\r\n\r\n')
|
||||
assert m.full_message == raw
|
||||
|
||||
####################
|
||||
## Request tests
|
||||
|
@ -398,7 +471,7 @@ def test_request_simple():
|
|||
assert r.is_ssl == False
|
||||
assert r.path == '/'
|
||||
assert r.port == 80
|
||||
assert r.status_line == 'GET / HTTP/1.1'
|
||||
assert r.start_line == 'GET / HTTP/1.1'
|
||||
assert r.verb == 'GET'
|
||||
assert r.version == 'HTTP/1.1'
|
||||
assert r.headers['Content-Length'] == '100'
|
||||
|
@ -409,7 +482,7 @@ def test_request_simple():
|
|||
assert r.headers['Host'] == 'www.test.com'
|
||||
assert r.headers['Connection'] == 'Keep-Alive'
|
||||
assert r.headers['Cache-Control'] == 'no-cache'
|
||||
assert r.raw_data == 'A'*100
|
||||
assert r.body == 'A'*100
|
||||
test(rf)
|
||||
test(rl)
|
||||
test(ru)
|
||||
|
@ -536,6 +609,7 @@ def test_request_parse_host():
|
|||
rf, rl, ru, rj = req_by_lines_and_full(header_lines)
|
||||
def test(r):
|
||||
assert r.complete
|
||||
assert r.port == 443
|
||||
assert r.host == 'www.test.com'
|
||||
assert r.is_ssl
|
||||
test(rf)
|
||||
|
@ -574,7 +648,7 @@ def test_repeated_request_headers():
|
|||
|
||||
def test_request_update_statusline():
|
||||
r = http.Request()
|
||||
r.status_line = 'GET / HTTP/1.1'
|
||||
r.start_line = 'GET / HTTP/1.1'
|
||||
assert r.verb == 'GET'
|
||||
assert r.path == '/'
|
||||
assert r.version == 'HTTP/1.1'
|
||||
|
@ -584,7 +658,7 @@ def test_request_update_statusline():
|
|||
|
||||
def test_request_update_cookies():
|
||||
r = http.Request()
|
||||
r.status_line = 'GET / HTTP/1.1'
|
||||
r.start_line = 'GET / HTTP/1.1'
|
||||
|
||||
# Check new cookies
|
||||
r.cookies['foo'] = 'bar'
|
||||
|
@ -607,7 +681,7 @@ def test_request_update_cookies():
|
|||
|
||||
def test_request_update_headers():
|
||||
r = http.Request()
|
||||
r.status_line = 'GET / HTTP/1.1'
|
||||
r.start_line = 'GET / HTTP/1.1'
|
||||
r.headers['Content-Length'] = '0'
|
||||
r.headers['Test-Header'] = 'Test Value'
|
||||
r.headers['Other-Header'] = 'Other Value'
|
||||
|
@ -624,11 +698,11 @@ def test_request_update_headers():
|
|||
|
||||
def test_request_modified_headers():
|
||||
r = http.Request()
|
||||
r.status_line = 'GET / HTTP/1.1'
|
||||
r.start_line = 'GET / HTTP/1.1'
|
||||
r.headers['content-length'] = '100'
|
||||
r.headers['cookie'] = 'abc=123'
|
||||
r.cookies['abc'] = '456'
|
||||
r.raw_data = 'AAAA'
|
||||
r.body = 'AAAA'
|
||||
assert r.full_request == ('GET / HTTP/1.1\r\n'
|
||||
'content-length: 4\r\n'
|
||||
'cookie: abc=456\r\n\r\n'
|
||||
|
@ -638,33 +712,34 @@ def test_request_modified_headers():
|
|||
|
||||
def test_request_update_data():
|
||||
r = http.Request()
|
||||
r.status_line = 'GET / HTTP/1.1'
|
||||
r.start_line = 'GET / HTTP/1.1'
|
||||
r.headers['content-length'] = 500
|
||||
r.raw_data = 'AAAA'
|
||||
r.body = 'AAAA'
|
||||
assert r.full_request == ('GET / HTTP/1.1\r\n'
|
||||
'content-length: 4\r\n'
|
||||
'\r\n'
|
||||
'AAAA')
|
||||
def test_request_to_json():
|
||||
r = http.Request()
|
||||
r.status_line = 'GET / HTTP/1.1'
|
||||
r.start_line = 'GET / HTTP/1.1'
|
||||
r.headers['content-length'] = 500
|
||||
r.tags = ['foo', 'bar']
|
||||
r.raw_data = 'AAAA'
|
||||
r.body = 'AAAA'
|
||||
r.reqid = '1'
|
||||
|
||||
rsp = http.Response()
|
||||
rsp.status_line = 'HTTP/1.1 200 OK'
|
||||
rsp.start_line = 'HTTP/1.1 200 OK'
|
||||
rsp.rspid = '2'
|
||||
|
||||
r.response = rsp
|
||||
|
||||
expected_reqdata = {u'full_request': unicode(base64.b64encode(r.full_request)),
|
||||
expected_reqdata = {u'full_message': unicode(base64.b64encode(r.full_request)),
|
||||
u'response_id': str(rsp.rspid),
|
||||
u'port': 80,
|
||||
u'is_ssl': False,
|
||||
u'tags': ['foo', 'bar'],
|
||||
u'reqid': str(r.reqid),
|
||||
u'host': '',
|
||||
}
|
||||
|
||||
assert json.loads(r.to_json()) == expected_reqdata
|
||||
|
@ -764,7 +839,7 @@ def test_request_copy():
|
|||
|
||||
def test_request_url_blankpath():
|
||||
r = http.Request()
|
||||
r.status_line = 'GET / HTTP/1.1'
|
||||
r.start_line = 'GET / HTTP/1.1'
|
||||
r.url = 'https://www.google.com'
|
||||
r.headers['Host'] = r.host
|
||||
r.url_params.from_dict({'foo': 'bar'})
|
||||
|
@ -789,10 +864,10 @@ def test_response_simple():
|
|||
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data)
|
||||
def test(r):
|
||||
assert r.complete
|
||||
assert r.raw_data == data
|
||||
assert r.body == data
|
||||
assert r.response_code == 200
|
||||
assert r.response_text == 'OK'
|
||||
assert r.status_line == 'HTTP/1.1 200 OK'
|
||||
assert r.start_line == 'HTTP/1.1 200 OK'
|
||||
assert r.version == 'HTTP/1.1'
|
||||
|
||||
assert r.headers['Date'] == 'Thu, 22 Oct 2015 00:37:17 GMT'
|
||||
|
@ -826,7 +901,7 @@ def test_response_chunked():
|
|||
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data)
|
||||
def test(r):
|
||||
assert r.complete
|
||||
assert r.raw_data == 'A'*0xAF + 'B'*0xBF
|
||||
assert r.body == 'A'*0xAF + 'B'*0xBF
|
||||
|
||||
test(rf)
|
||||
test(rl)
|
||||
|
@ -851,7 +926,7 @@ def test_response_gzip():
|
|||
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data_comp)
|
||||
def test(r):
|
||||
assert r.complete
|
||||
assert r.raw_data == data_decomp
|
||||
assert r.body == data_decomp
|
||||
|
||||
test(rf)
|
||||
test(rl)
|
||||
|
@ -876,7 +951,7 @@ def test_response_deflate():
|
|||
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data_comp)
|
||||
def test(r):
|
||||
assert r.complete
|
||||
assert r.raw_data == data_decomp
|
||||
assert r.body == data_decomp
|
||||
|
||||
test(rf)
|
||||
test(rl)
|
||||
|
@ -907,7 +982,7 @@ def test_response_chunked_gzip():
|
|||
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data_chunked)
|
||||
def test(r):
|
||||
assert r.complete
|
||||
assert r.raw_data == data_decomp
|
||||
assert r.body == data_decomp
|
||||
assert r.headers['Content-Length'] == str(len(data_decomp))
|
||||
assert r.full_response == ('HTTP/1.1 200 OK\r\n'
|
||||
'Date: Thu, 22 Oct 2015 00:37:17 GMT\r\n'
|
||||
|
@ -924,7 +999,7 @@ def test_response_chunked_gzip():
|
|||
|
||||
def test_response_early_completion():
|
||||
r = http.Response()
|
||||
r.status_line = 'HTTP/1.1 200 OK'
|
||||
r.start_line = 'HTTP/1.1 200 OK'
|
||||
r.add_line('Content-Length: 0')
|
||||
assert not r.complete
|
||||
r.add_line('')
|
||||
|
@ -992,7 +1067,7 @@ def test_repeated_response_headers():
|
|||
|
||||
def test_response_update_statusline():
|
||||
r = http.Response()
|
||||
r.status_line = 'HTTP/1.1 200 OK'
|
||||
r.start_line = 'HTTP/1.1 200 OK'
|
||||
assert r.version == 'HTTP/1.1'
|
||||
assert r.response_code == 200
|
||||
assert r.response_text == 'OK'
|
||||
|
@ -1002,7 +1077,7 @@ def test_response_update_statusline():
|
|||
|
||||
def test_response_update_headers():
|
||||
r = http.Response()
|
||||
r.status_line = 'HTTP/1.1 200 OK'
|
||||
r.start_line = 'HTTP/1.1 200 OK'
|
||||
r.headers['Test-Header'] = 'Test Value'
|
||||
r.headers['Other-Header'] = 'Other Value'
|
||||
|
||||
|
@ -1018,9 +1093,9 @@ def test_response_update_headers():
|
|||
|
||||
def test_response_update_modified_headers():
|
||||
r = http.Response()
|
||||
r.status_line = 'HTTP/1.1 200 OK'
|
||||
r.start_line = 'HTTP/1.1 200 OK'
|
||||
r.headers['content-length'] = '500'
|
||||
r.raw_data = 'AAAA'
|
||||
r.body = 'AAAA'
|
||||
assert r.full_response == ('HTTP/1.1 200 OK\r\n'
|
||||
'content-length: 4\r\n\r\n'
|
||||
'AAAA')
|
||||
|
@ -1028,7 +1103,7 @@ def test_response_update_modified_headers():
|
|||
|
||||
def test_response_update_cookies():
|
||||
r = http.Response()
|
||||
r.status_line = 'HTTP/1.1 200 OK'
|
||||
r.start_line = 'HTTP/1.1 200 OK'
|
||||
# Test by adding headers
|
||||
r.headers['Set-Cookie'] = 'abc=123'
|
||||
assert r.full_response == ('HTTP/1.1 200 OK\r\n'
|
||||
|
@ -1042,7 +1117,7 @@ def test_response_update_cookies():
|
|||
assert r.cookies['abc'].val == '456'
|
||||
|
||||
r = http.Response()
|
||||
r.status_line = 'HTTP/1.1 200 OK'
|
||||
r.start_line = 'HTTP/1.1 200 OK'
|
||||
# Test by adding cookie objects
|
||||
c = http.ResponseCookie('abc=123; secure')
|
||||
r.cookies['abc'] = c
|
||||
|
@ -1060,10 +1135,10 @@ def test_response_update_content_length():
|
|||
|
||||
def test_response_to_json():
|
||||
rsp = http.Response()
|
||||
rsp.status_line = 'HTTP/1.1 200 OK'
|
||||
rsp.start_line = 'HTTP/1.1 200 OK'
|
||||
rsp.rspid = 2
|
||||
|
||||
expected_reqdata = {'full_response': base64.b64encode(rsp.full_response),
|
||||
expected_reqdata = {'full_message': base64.b64encode(rsp.full_response),
|
||||
'rspid': rsp.rspid,
|
||||
#'tag': r.tag,
|
||||
}
|
||||
|
|
|
@ -6,7 +6,6 @@ import twisted.test
|
|||
|
||||
from pappyproxy import http
|
||||
from pappyproxy import macros
|
||||
from pappyproxy import mangle
|
||||
from pappyproxy import config
|
||||
from pappyproxy.proxy import ProxyClient, ProxyClientFactory, ProxyServerFactory
|
||||
from testutil import mock_deferred, func_deleted, func_ignored_deferred, func_ignored, no_tcp
|
||||
|
@ -18,7 +17,7 @@ from twisted.internet import defer, reactor
|
|||
## Fixtures
|
||||
|
||||
MANGLED_REQ = 'GET /mangled HTTP/1.1\r\n\r\n'
|
||||
MANGLED_RSP = 'HTTP/1.1 500 MANGLED\r\n\r\n'
|
||||
MANGLED_RSP = 'HTTP/1.1 500 MANGLED\r\nContent-Length: 0\r\n\r\n'
|
||||
|
||||
@pytest.fixture
|
||||
def unconnected_proxyserver(mocker):
|
||||
|
@ -140,25 +139,25 @@ def gen_mangle_macro(modified_req=None, modified_rsp=None,
|
|||
macro = mock.MagicMock()
|
||||
if modified_req or drop_req:
|
||||
macro.async_req = True
|
||||
macro.do_req = True
|
||||
macro.intercept_requests = True
|
||||
if drop_req:
|
||||
newreq = None
|
||||
else:
|
||||
newreq = http.Request(modified_req)
|
||||
macro.async_mangle_request.return_value = mock_deferred(newreq)
|
||||
else:
|
||||
macro.do_req = False
|
||||
macro.intercept_requests = False
|
||||
|
||||
if modified_rsp or drop_rsp:
|
||||
macro.async_rsp = True
|
||||
macro.do_rsp = True
|
||||
macro.intercept_responses = True
|
||||
if drop_rsp:
|
||||
newrsp = None
|
||||
else:
|
||||
newrsp = http.Response(modified_rsp)
|
||||
macro.async_mangle_response.return_value = mock_deferred(newrsp)
|
||||
else:
|
||||
macro.do_rsp = False
|
||||
macro.intercept_responses = False
|
||||
return macro
|
||||
|
||||
def notouch_mangle_req(request):
|
||||
|
@ -255,7 +254,7 @@ def test_proxy_client_mangle_rsp(mocker, proxy_connection, in_scope_true):
|
|||
prot.lineReceived('')
|
||||
req = yield retreq_deferred
|
||||
response = req.response.full_response
|
||||
assert response == 'HTTP/1.1 500 MANGLED\r\n\r\n'
|
||||
assert response == 'HTTP/1.1 500 MANGLED\r\nContent-Length: 0\r\n\r\n'
|
||||
|
||||
@pytest.inlineCallbacks
|
||||
def test_proxy_drop_req(mocker, proxy_connection, in_scope_true):
|
||||
|
|
|
@ -5,13 +5,13 @@ from pappyproxy.http import Request, Response, ResponseCookie
|
|||
@pytest.fixture
|
||||
def req():
|
||||
r = Request()
|
||||
r.status_line = 'GET / HTTP/1.1'
|
||||
r.start_line = 'GET / HTTP/1.1'
|
||||
return r
|
||||
|
||||
@pytest.fixture
|
||||
def rsp():
|
||||
r = Response()
|
||||
r.status_line = 'HTTP/1.1 200 OK'
|
||||
r.start_line = 'HTTP/1.1 200 OK'
|
||||
return r
|
||||
|
||||
def test_session_basic(req, rsp):
|
||||
|
@ -96,7 +96,7 @@ def test_session_mixed(req, rsp):
|
|||
assert 'auth' not in rsp.headers
|
||||
|
||||
r = Response()
|
||||
r.status_line = 'HTTP/1.1 200 OK'
|
||||
r.start_line = 'HTTP/1.1 200 OK'
|
||||
r.set_cookie(ResponseCookie('state=bazzers'))
|
||||
r.set_cookie(ResponseCookie('session=buzzers'))
|
||||
s.get_rsp(r)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue