Release 0.0.1

See README diff for changes
This commit is contained in:
Rob Glew 2015-11-23 22:44:31 -06:00
parent 6633423420
commit 4e6801e4d8
39 changed files with 917 additions and 443 deletions

View file

View file

@ -0,0 +1,402 @@
import pytest
from pappyproxy import context
from pappyproxy.http import Request, Response, ResponseCookie
@pytest.fixture
def http_request():
return Request('GET / HTTP/1.1\r\n')
def test_filter_reqs():
pass
def test_gen_filter_by_all_request():
f = context.gen_filter_by_all(context.cmp_contains, 'hello')
fn = context.gen_filter_by_all(context.cmp_contains, 'hello', negate=True)
# Nowhere
r = Request('GET / HTTP/1.1\r\n')
assert not f(r)
assert fn(r)
# Verb
r = Request('hello / HTTP/1.1\r\n')
assert f(r)
assert not fn(r)
# Path
r = Request('GET /hello HTTP/1.1\r\n')
assert f(r)
assert not fn(r)
# Data
r = Request('GET / HTTP/1.1\r\n')
r.raw_data = 'hello'
assert f(r)
assert not fn(r)
# Header key
r = Request('GET / HTTP/1.1\r\n')
r.headers['hello'] = 'goodbye'
assert f(r)
assert not fn(r)
# Header value
r = Request('GET / HTTP/1.1\r\n')
r.headers['goodbye'] = 'hello'
assert f(r)
assert not fn(r)
# Nowhere in headers
r = Request('GET / HTTP/1.1\r\n')
r.headers['goodbye'] = 'for real'
assert not f(r)
assert fn(r)
# Cookie key
r = Request('GET / HTTP/1.1\r\n')
r.cookies['hello'] = 'world'
r.update_from_objects()
assert f(r)
assert not fn(r)
# Cookie value
r = Request('GET / HTTP/1.1\r\n')
r.cookies['world'] = 'hello'
r.update_from_objects()
assert f(r)
assert not fn(r)
# Nowhere in cookie
r = Request('GET / HTTP/1.1\r\n')
r.cookies['world'] = 'sucks'
r.update_from_objects()
assert not f(r)
assert fn(r)
def test_gen_filter_by_all_response(http_request):
f = context.gen_filter_by_all(context.cmp_contains, 'hello')
fn = context.gen_filter_by_all(context.cmp_contains, 'hello', negate=True)
# Nowhere
r = Response('HTTP/1.1 200 OK\r\n')
http_request.response = r
assert not f(http_request)
assert fn(http_request)
# Response text
r = Response('HTTP/1.1 200 hello\r\n')
http_request.response = r
assert f(http_request)
assert not fn(http_request)
# Data
r = Response('HTTP/1.1 200 OK\r\n')
http_request.response = r
r.raw_data = 'hello'
assert f(http_request)
assert not fn(http_request)
# Header key
r = Response('HTTP/1.1 200 OK\r\n')
http_request.response = r
r.headers['hello'] = 'goodbye'
assert f(http_request)
assert not fn(http_request)
# Header value
r = Response('HTTP/1.1 200 OK\r\n')
http_request.response = r
r.headers['goodbye'] = 'hello'
assert f(http_request)
assert not fn(http_request)
# Nowhere in headers
r = Response('HTTP/1.1 200 OK\r\n')
http_request.response = r
r.headers['goodbye'] = 'for real'
assert not f(http_request)
assert fn(http_request)
# Cookie key
r = Response('HTTP/1.1 200 OK\r\n')
http_request.response = r
r.add_cookie(ResponseCookie('hello=goodbye'))
r.update_from_objects()
assert f(http_request)
assert not fn(http_request)
# Cookie value
r = Response('HTTP/1.1 200 OK\r\n')
http_request.response = r
r.add_cookie(ResponseCookie('goodbye=hello'))
r.update_from_objects()
assert f(http_request)
assert not fn(http_request)
# Nowhere in cookie
r = Response('HTTP/1.1 200 OK\r\n')
http_request.response = r
r.add_cookie(ResponseCookie('goodbye=for real'))
r.update_from_objects()
assert not f(http_request)
assert fn(http_request)
def test_filter_by_host(http_request):
f = context.gen_filter_by_host(context.cmp_contains, 'sexy')
fn = context.gen_filter_by_host(context.cmp_contains, 'sexy', negate=True)
http_request.headers['Host'] = 'google.com'
http_request.headers['MiscHeader'] = 'vim.sexy'
assert not f(http_request)
assert fn(http_request)
http_request.headers['Host'] = 'vim.sexy'
http_request.update_from_text()
assert http_request.host == 'vim.sexy'
assert f(http_request)
assert not fn(http_request)
def test_filter_by_body():
f = context.gen_filter_by_body(context.cmp_contains, 'sexy')
fn = context.gen_filter_by_body(context.cmp_contains, 'sexy', negate=True)
# Test request bodies
r = Request()
r.status_line = 'GET /sexy HTTP/1.1'
r.headers['Header'] = 'sexy'
r.raw_data = 'foo'
assert not f(r)
assert fn(r)
r.raw_data = 'sexy'
assert f(r)
assert not fn(r)
# Test response bodies
r = Request()
rsp = Response()
rsp.status_line = 'HTTP/1.1 200 OK'
rsp.headers['sexy'] = 'sexy'
r.status_line = 'GET /sexy HTTP/1.1'
r.headers['Header'] = 'sexy'
r.response = rsp
assert not f(r)
assert fn(r)
rsp.raw_data = 'sexy'
assert f(r)
assert not fn(r)
def test_filter_by_response_code(http_request):
f = context.gen_filter_by_response_code(context.cmp_eq, 200)
fn = context.gen_filter_by_response_code(context.cmp_eq, 200, negate=True)
r = Response()
http_request.response = r
r.status_line = 'HTTP/1.1 404 Not Found'
assert not f(http_request)
assert fn(http_request)
r.status_line = 'HTTP/1.1 200 OK'
assert f(http_request)
assert not fn(http_request)
def test_filter_by_raw_headers_request():
f1 = context.gen_filter_by_raw_headers(context.cmp_contains, 'Sexy:')
fn1 = context.gen_filter_by_raw_headers(context.cmp_contains, 'Sexy:', negate=True)
f2 = context.gen_filter_by_raw_headers(context.cmp_contains, 'sexy\r\nHeader')
fn2 = context.gen_filter_by_raw_headers(context.cmp_contains, 'sexy\r\nHeader', negate=True)
r = Request('GET / HTTP/1.1\r\n')
rsp = Response('HTTP/1.1 200 OK\r\n')
r.response = rsp
r.headers['Header'] = 'Sexy'
assert not f1(r)
assert fn1(r)
assert not f2(r)
assert fn2(r)
r = Request('GET / HTTP/1.1\r\n')
rsp = Response('HTTP/1.1 200 OK\r\n')
r.response = rsp
r.headers['Sexy'] = 'sexy'
assert f1(r)
assert not fn1(r)
assert not f2(r)
assert fn2(r)
r.headers['OtherHeader'] = 'sexy'
r.headers['Header'] = 'foo'
assert f1(r)
assert not fn1(r)
assert f2(r)
assert not fn2(r)
def test_filter_by_raw_headers_response():
f1 = context.gen_filter_by_raw_headers(context.cmp_contains, 'Sexy:')
fn1 = context.gen_filter_by_raw_headers(context.cmp_contains, 'Sexy:', negate=True)
f2 = context.gen_filter_by_raw_headers(context.cmp_contains, 'sexy\r\nHeader')
fn2 = context.gen_filter_by_raw_headers(context.cmp_contains, 'sexy\r\nHeader', negate=True)
r = Request('GET / HTTP/1.1\r\n')
rsp = Response('HTTP/1.1 200 OK\r\n')
r.response = rsp
rsp.headers['Header'] = 'Sexy'
assert not f1(r)
assert fn1(r)
assert not f2(r)
assert fn2(r)
r = Request('GET / HTTP/1.1\r\n')
rsp = Response('HTTP/1.1 200 OK\r\n')
r.response = rsp
rsp.headers['Sexy'] = 'sexy'
assert f1(r)
assert not fn1(r)
assert not f2(r)
assert fn2(r)
rsp.headers['OtherHeader'] = 'sexy'
rsp.headers['Header'] = 'foo'
assert f1(r)
assert not fn1(r)
assert f2(r)
assert not fn2(r)
def test_filter_by_path(http_request):
f = context.gen_filter_by_path(context.cmp_contains, 'porn') # find the fun websites
fn = context.gen_filter_by_path(context.cmp_contains, 'porn', negate=True) # find the boring websites
http_request.status_line = 'GET / HTTP/1.1'
assert not f(http_request)
assert fn(http_request)
http_request.status_line = 'GET /path/to/great/porn HTTP/1.1'
assert f(http_request)
assert not fn(http_request)
http_request.status_line = 'GET /path/to/porn/great HTTP/1.1'
assert f(http_request)
assert not fn(http_request)
def test_gen_filter_by_submitted_cookies():
f1 = context.gen_filter_by_submitted_cookies(context.cmp_contains, 'Session')
f2 = context.gen_filter_by_submitted_cookies(context.cmp_contains, 'Cookie',
context.cmp_contains, 'CookieVal')
r = Request(('GET / HTTP/1.1\r\n'
'Cookie: foo=bar\r\n'
'\r\n'))
assert not f1(r)
assert not f2(r)
r = Request(('GET / HTTP/1.1\r\n'
'Cookie: Session=bar\r\n'
'\r\n'))
assert f1(r)
assert not f2(r)
r = Request(('GET / HTTP/1.1\r\n'
'Cookie: Session=bar; CookieThing=NoMatch\r\n'
'\r\n'))
assert f1(r)
assert not f2(r)
r = Request(('GET / HTTP/1.1\r\n'
'Cookie: Session=bar; CookieThing=CookieValue\r\n'
'\r\n'))
assert f1(r)
assert f2(r)
def test_gen_filter_by_set_cookies():
f1 = context.gen_filter_by_set_cookies(context.cmp_contains, 'Session')
f2 = context.gen_filter_by_set_cookies(context.cmp_contains, 'Cookie',
context.cmp_contains, 'CookieVal')
r = Request('GET / HTTP/1.1\r\n\r\n')
rsp = Response(('HTTP/1.1 200 OK\r\n'
'Set-Cookie: foo=bar\r\n'
'\r\n'))
r.response = rsp
assert not f1(r)
assert not f2(r)
r = Request('GET / HTTP/1.1\r\n\r\n')
rsp = Response(('HTTP/1.1 200 OK\r\n'
'Set-Cookie: foo=bar\r\n'
'Set-Cookie: Session=Banana\r\n'
'\r\n'))
r.response = rsp
assert f1(r)
assert not f2(r)
r = Request('GET / HTTP/1.1\r\n\r\n')
rsp = Response(('HTTP/1.1 200 OK\r\n'
'Set-Cookie: foo=bar\r\n'
'Set-Cookie: Session=Banana\r\n'
'Set-Cookie: CookieThing=NoMatch\r\n'
'\r\n'))
r.response = rsp
assert f1(r)
assert not f2(r)
r = Request('GET / HTTP/1.1\r\n\r\n')
rsp = Response(('HTTP/1.1 200 OK\r\n'
'Set-Cookie: foo=bar\r\n'
'Set-Cookie: Session=Banana\r\n'
'Set-Cookie: CookieThing=CookieValue\r\n'
'\r\n'))
r.response = rsp
assert f1(r)
assert f2(r)
def test_filter_by_params_get():
f1 = context.gen_filter_by_params(context.cmp_contains, 'Session')
f2 = context.gen_filter_by_params(context.cmp_contains, 'Cookie',
context.cmp_contains, 'CookieVal')
r = Request('GET / HTTP/1.1\r\n\r\n')
assert not f1(r)
assert not f2(r)
r = Request('GET /?Session=foo HTTP/1.1\r\n\r\n')
assert f1(r)
assert not f2(r)
r = Request('GET /?Session=foo&CookieThing=Fail HTTP/1.1\r\n\r\n')
assert f1(r)
assert not f2(r)
r = Request('GET /?Session=foo&CookieThing=CookieValue HTTP/1.1\r\n\r\n')
assert f1(r)
assert f2(r)
def test_filter_by_params_post():
f1 = context.gen_filter_by_params(context.cmp_contains, 'Session')
f2 = context.gen_filter_by_params(context.cmp_contains, 'Cookie',
context.cmp_contains, 'CookieVal')
r = Request(('GET / HTTP/1.1\r\n'
'Content-Type: application/x-www-form-urlencoded\r\n\r\n'))
r.raw_data = 'foo=bar'
assert not f1(r)
assert not f2(r)
r = Request(('GET / HTTP/1.1\r\n'
'Content-Type: application/x-www-form-urlencoded\r\n\r\n'))
r.raw_data = 'Session=bar'
assert f1(r)
assert not f2(r)
r = Request(('GET / HTTP/1.1\r\n'
'Content-Type: application/x-www-form-urlencoded\r\n\r\n'))
r.raw_data = 'Session=bar&Cookie=foo'
assert f1(r)
assert not f2(r)
r = Request(('GET / HTTP/1.1\r\n'
'Content-Type: application/x-www-form-urlencoded\r\n\r\n'))
r.raw_data = 'Session=bar&CookieThing=CookieValue'
assert f1(r)
assert f2(r)

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,220 @@
import pytest
import mock
import twisted.internet
import twisted.test
from pappyproxy import http
from pappyproxy import mangle
from pappyproxy.proxy import ProxyClient, ProxyClientFactory, ProxyServer
from testutil import mock_deferred, func_deleted, func_ignored_deferred, func_ignored, no_tcp
from twisted.internet.protocol import ServerFactory
from twisted.test.iosim import FakeTransport
from twisted.internet import defer, reactor
####################
## Fixtures
@pytest.fixture
def unconnected_proxyserver(mocker):
mocker.patch("twisted.test.iosim.FakeTransport.startTLS")
mocker.patch("pappyproxy.proxy.load_certs_from_dir", new=mock_generate_cert)
factory = ServerFactory()
factory.protocol = ProxyServer
protocol = factory.buildProtocol(('127.0.0.1', 0))
protocol.makeConnection(FakeTransport(protocol, True))
return protocol
@pytest.fixture
def proxyserver(mocker):
mocker.patch("twisted.test.iosim.FakeTransport.startTLS")
mocker.patch("pappyproxy.proxy.load_certs_from_dir", new=mock_generate_cert)
factory = ServerFactory()
factory.protocol = ProxyServer
protocol = factory.buildProtocol(('127.0.0.1', 0))
protocol.makeConnection(FakeTransport(protocol, True))
protocol.lineReceived('CONNECT https://www.AAAA.BBBB:443 HTTP/1.1')
protocol.lineReceived('')
protocol.transport.getOutBuffer()
return protocol
@pytest.fixture
def proxy_connection():
@defer.inlineCallbacks
def gen_connection(send_data):
factory = ProxyClientFactory(http.Request(send_data))
protocol = factory.buildProtocol(None)
tr = FakeTransport(protocol, True)
protocol.makeConnection(tr)
sent = yield protocol.data_defer
defer.returnValue((protocol, sent, factory.data_defer))
return gen_connection
## Autorun fixtures
# @pytest.fixture(autouse=True)
# def no_mangle(mocker):
# # Don't call anything in mangle.py
# mocker.patch("mangle.mangle_request", notouch_mangle_req)
# mocker.patch("mangle.mangle_response", notouch_mangle_rsp)
@pytest.fixture(autouse=True)
def ignore_save(mocker):
mocker.patch("pappyproxy.http.Request.deep_save", func_ignored_deferred)
####################
## Mock functions
def mock_generate_cert(cert_dir):
private_key = ('-----BEGIN PRIVATE KEY-----\n'
'MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDAoClrYUEB7lM0\n'
'zQaKkXZVG2d1Bu9hV8urpx0gNXMbyZ2m3xb+sKZju/FHPuWenA4KaN5gRUT+oLfv\n'
'tnF6Ia0jpRNWnX0Fyn/irdg1BWGJn7k7mJ2D0NXZQczn2+xxY05599NfGWqNKCYy\n'
'jhSwPsUK+sGJqi7aSDdlS97ZTjrQVTTFsC0+kSu4lS5fsWXxqrKLa6Ao8W7abVRO\n'
'JHazh/cxM4UKpgWU+E6yD4o4ZgHY+SMTVUh/IOM8DuOVyLEWtx4oLNiLMlpWT3qy\n'
'4IMpOF6VuU6JF2HGV13SoJfhsLXsPRbLVTAnZvJcZwtgDm6NfKapU8W8olkDV1Bf\n'
'YQEMSNX7AgMBAAECggEBAII0wUrAdrzjaIMsg9tu8FofKBPHGFDok9f4Iov/FUwX\n'
'QOXnrxeCOTb5d+L89SH9ws/ui0LwD+8+nJcA8DvqP6r0jtnhov0jIMcNVDSi6oeo\n'
'3AEY7ICJzcQJ4oRn+K+8vPNdPhfuikPYe9l4iSuJgpAlaGWyD/GlFyz12DFz2/Wu\n'
'NIcqR1ucvezRHn3eGMtvDv2WGaN4ifUc30k8XgSUesmwSI6beb5+hxq7wXfsurnP\n'
'EUrPY9ts3lfiAgxzTKOuj1VR5hn7cJyLN8jF0mZs4D6eSSHorIddhmaNiCq5ZbMd\n'
'QdlDiPvnXHT41OoXOb7tDEt7SGoiRh2noCZ1aZiSziECgYEA+tuPPLYWU6JRB6EW\n'
'PhbcXQbh3vML7eT1q7DOz0jYCojgT2+k7EWSI8T830oQyjbpe3Z86XEgH7UBjUgq\n'
'27nJ4E6dQDYGbYCKEklOoCGLE7A60i1feIz8otOQRrbQ4jcpibEgscA6gzHmunYf\n'
'De5euUgYW+Rq2Vmr6/NzUaUgui8CgYEAxJMDwPOGgiLM1cczlaSIU9Obz+cVnwWn\n'
'nsdKYMto2V3yKLydDfjsgOgzxHOxxy+5L645TPxK6CkiISuhJ93kAFFtx+1sCBCT\n'
'tVzY5robVAekxA9tlPIxtsn3+/axx3n6HnV0oA/XtxkuOS5JImgEdXqFwJZkerGE\n'
'waftIU2FCfUCgYEArl8+ErJzlJEIiCgWIPSdGuD00pfZW/TCPCT7rKRy3+fDHBR7\n'
'7Gxzp/9+0utV/mnrJBH5w/8JmGCmgoF+oRtk01FyBzdGgolN8GYajD6kwPvH917o\n'
'tRAzcC9lY3IigoxbiEWid0wqoBVoz4XaEkH2gA44OG/vQcQOOEYSi9cfh6sCgYBg\n'
'KLaOXdJvuIxRCzgNvMW/k+VFh3pJJx//COg2f2qT4mQCT3nYiutOh8hDEoFluc+y\n'
'Jlz7bvNJrE14wnn8IYxWJ383bMoLC+jlsDyeaW3S5kZQbmehk/SDwTrg86W1udKD\n'
'sdtSLU3N0LCO4jh+bzm3Ki9hrXALoOkbPoU+ZEhvPQKBgQDf79XQ3RNxZSk+eFyq\n'
'qD8ytVqxEoD+smPDflXXseVH6o+pNWrF8+A0KqmO8c+8KVzWj/OfULO6UbKd3E+x\n'
'4JGkWu9yF1lEgtHgibF2ER8zCSIL4ikOEasPCkrKj5SrS4Q+j4u5ha76dIc2CVu1\n'
'hkX2PQ1xU4ocu06k373sf73A4Q==\n'
'-----END PRIVATE KEY-----')
ca_key = ('-----BEGIN CERTIFICATE-----\n'
'MIIDjzCCAncCFQCjC8r+I4xa7JoGUJYGOTcqDROA0DANBgkqhkiG9w0BAQsFADBg\n'
'MQswCQYDVQQGEwJVUzERMA8GA1UECBMITWljaGlnYW4xEjAQBgNVBAcTCUFubiBB\n'
'cmJvcjEUMBIGA1UEChMLUGFwcHkgUHJveHkxFDASBgNVBAMTC1BhcHB5IFByb3h5\n'
'MB4XDTE1MTEyMDIxMTEzOVoXDTI1MTExNzIxMTEzOVowYDELMAkGA1UEBhMCVVMx\n'
'ETAPBgNVBAgTCE1pY2hpZ2FuMRIwEAYDVQQHEwlBbm4gQXJib3IxFDASBgNVBAoT\n'
'C1BhcHB5IFByb3h5MRQwEgYDVQQDEwtQYXBweSBQcm94eTCCASIwDQYJKoZIhvcN\n'
'AQEBBQADggEPADCCAQoCggEBAMCgKWthQQHuUzTNBoqRdlUbZ3UG72FXy6unHSA1\n'
'cxvJnabfFv6wpmO78Uc+5Z6cDgpo3mBFRP6gt++2cXohrSOlE1adfQXKf+Kt2DUF\n'
'YYmfuTuYnYPQ1dlBzOfb7HFjTnn3018Zao0oJjKOFLA+xQr6wYmqLtpIN2VL3tlO\n'
'OtBVNMWwLT6RK7iVLl+xZfGqsotroCjxbtptVE4kdrOH9zEzhQqmBZT4TrIPijhm\n'
'Adj5IxNVSH8g4zwO45XIsRa3Higs2IsyWlZPerLggyk4XpW5TokXYcZXXdKgl+Gw\n'
'tew9FstVMCdm8lxnC2AObo18pqlTxbyiWQNXUF9hAQxI1fsCAwEAAaNFMEMwEgYD\n'
'VR0TAQH/BAgwBgEB/wIBADAOBgNVHQ8BAf8EBAMCAQYwHQYDVR0OBBYEFNo5o+5e\n'
'a0sNMlW/75VgGJCv2AcJMA0GCSqGSIb3DQEBCwUAA4IBAQBdJDhxbmoEe27bD8me\n'
'YTcLGjs/StKkSil7rLbX+tBCwtkm5UEEejBuAhKk2FuAXW8yR1FqKJSZwVCAocBT\n'
'Bo/+97Ee+h7ywrRFhATEr9D/TbbHKOjCjDzOMl9yLZa2DKErZjbI30ZD6NafWS/X\n'
'hx5X1cGohHcVVzT4jIgUEU70vvYfNn8CTZm4oJ7qqRe/uQPUYy0rwvbd60oprtGg\n'
'jNv1H5R4ODHUMBXAI9H7ft9cWrd0fBQjxhoj8pvgJXEZ52flXSqQc7qHLg1wO/zC\n'
'RUgpTcNAb2qCssBKbj+c1vKEPRUJfw6UYb0s1462rQNc8BgZiKaNbwokFmkAnjUg\n'
'AvnX\n'
'-----END CERTIFICATE-----')
return (ca_key, private_key)
def notouch_mangle_req(request, conn_id):
orig_req = http.Request(request.full_request)
orig_req.port = request.port
orig_req.is_ssl = request.is_ssl
d = mock_deferred(orig_req)
return d
def notouch_mangle_rsp(response, conn_id):
req = http.Request()
orig_rsp = http.Response(response.full_response)
req.response = orig_rsp
d = mock_deferred(req)
return d
def req_mangler_change(request, conn_id):
req = http.Request('GET /mangled HTTP/1.1\r\n\r\n')
d = mock_deferred(req)
return d
def rsp_mangler_change(request, conn_id):
req = http.Request()
rsp = http.Response('HTTP/1.1 500 MANGLED\r\n\r\n')
req.response = rsp
d = mock_deferred(req)
return d
####################
## Unit test tests
def test_proxy_server_fixture(unconnected_proxyserver):
unconnected_proxyserver.transport.write('hello')
assert unconnected_proxyserver.transport.getOutBuffer() == 'hello'
@pytest.inlineCallbacks
def test_mock_deferreds():
d = mock_deferred('Hello!')
r = yield d
assert r == 'Hello!'
def test_deleted():
with pytest.raises(NotImplementedError):
reactor.connectTCP("www.google.com", "80", ServerFactory)
with pytest.raises(NotImplementedError):
reactor.connectSSL("www.google.com", "80", ServerFactory)
####################
## Proxy Server Tests
def test_proxy_server_connect(unconnected_proxyserver, mocker):
mocker.patch("twisted.internet.reactor.connectSSL")
unconnected_proxyserver.lineReceived('CONNECT https://www.dddddd.fff:433 HTTP/1.1')
unconnected_proxyserver.lineReceived('')
assert unconnected_proxyserver.transport.getOutBuffer() == 'HTTP/1.1 200 Connection established\r\n\r\n'
assert unconnected_proxyserver._request_obj.is_ssl
def test_proxy_server_basic(proxyserver, mocker):
mocker.patch("twisted.internet.reactor.connectSSL")
mocker.patch('pappyproxy.proxy.ProxyServer.setRawMode')
proxyserver.lineReceived('GET / HTTP/1.1')
proxyserver.lineReceived('')
assert proxyserver.setRawMode.called
args, kwargs = twisted.internet.reactor.connectSSL.call_args
assert args[0] == 'www.AAAA.BBBB'
assert args[1] == 443
@pytest.inlineCallbacks
def test_proxy_client_basic(mocker, proxy_connection):
mocker.patch('pappyproxy.mangle.mangle_request', new=notouch_mangle_req)
mocker.patch('pappyproxy.mangle.mangle_response', new=notouch_mangle_rsp)
# Make the connection
(prot, sent, resp_deferred) = yield proxy_connection('GET / HTTP/1.1\r\n\r\n')
assert sent == 'GET / HTTP/1.1\r\n\r\n'
prot.lineReceived('HTTP/1.1 200 OK')
prot.lineReceived('Content-Length: 0')
prot.lineReceived('')
ret_req = yield resp_deferred
response = ret_req.response.full_response
assert response == 'HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n'
@pytest.inlineCallbacks
def test_proxy_client_mangle_req(mocker, proxy_connection):
mocker.patch('pappyproxy.mangle.mangle_request', new=req_mangler_change)
mocker.patch('pappyproxy.mangle.mangle_response', new=notouch_mangle_rsp)
# Make the connection
(prot, sent, resp_deferred) = yield proxy_connection('GET / HTTP/1.1\r\n\r\n')
assert sent == 'GET /mangled HTTP/1.1\r\n\r\n'
@pytest.inlineCallbacks
def test_proxy_client_basic(mocker, proxy_connection):
mocker.patch('pappyproxy.mangle.mangle_request', new=notouch_mangle_req)
mocker.patch('pappyproxy.mangle.mangle_response', new=rsp_mangler_change)
# Make the connection
(prot, sent, resp_deferred) = yield proxy_connection('GET / HTTP/1.1\r\n\r\n')
prot.lineReceived('HTTP/1.1 200 OK')
prot.lineReceived('Content-Length: 0')
prot.lineReceived('')
ret_req = yield resp_deferred
response = ret_req.response.full_response
assert response == 'HTTP/1.1 500 MANGLED\r\n\r\n'

View file

@ -0,0 +1,42 @@
import pytest
from twisted.internet import defer
class ClassDeleted():
pass
def func_deleted(*args, **kwargs):
raise NotImplementedError()
def func_ignored(*args, **kwargs):
pass
def func_ignored_deferred(*args, **kwargs):
return mock_deferred(None)
def mock_deferred(value):
# Generates a function that can be used to make a deferred that can be used
# to mock out deferred-returning responses
def g(data):
return value
d = defer.Deferred()
d.addCallback(g)
d.callback(None)
return d
@pytest.fixture(autouse=True)
def no_tcp(mocker):
# Don't make tcp connections
mocker.patch("twisted.internet.reactor.connectTCP", new=func_deleted)
mocker.patch("twisted.internet.reactor.connectSSL", new=func_deleted)
@pytest.fixture
def ignore_tcp(mocker):
# Don't make tcp connections
mocker.patch("twisted.internet.reactor.connectTCP", new=func_ignored)
mocker.patch("twisted.internet.reactor.connectSSL", new=func_ignored)
@pytest.fixture(autouse=True)
def no_database(mocker):
# Don't make database queries
mocker.patch("twisted.enterprise.adbapi.ConnectionPool",
new=ClassDeleted)