A fork of pappy proxy
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

286 lines
12 KiB

import os
import pytest
import mock
import twisted.internet
import twisted.test
from pappyproxy import http
from pappyproxy import macros
from pappyproxy import mangle
from pappyproxy import config
from pappyproxy.proxy import ProxyClient, ProxyClientFactory, ProxyServerFactory
from testutil import mock_deferred, func_deleted, func_ignored_deferred, func_ignored, no_tcp
from twisted.internet.protocol import ServerFactory
from twisted.test.iosim import FakeTransport
from twisted.internet import defer, reactor
####################
## Fixtures
MANGLED_REQ = 'GET /mangled HTTP/1.1\r\n\r\n'
MANGLED_RSP = 'HTTP/1.1 500 MANGLED\r\n\r\n'
@pytest.fixture
def unconnected_proxyserver(mocker):
mocker.patch("twisted.test.iosim.FakeTransport.startTLS")
mocker.patch("pappyproxy.proxy.load_certs_from_dir", new=mock_generate_cert)
factory = ProxyServerFactory()
protocol = factory.buildProtocol(('127.0.0.1', 0))
protocol.makeConnection(FakeTransport(protocol, True))
return protocol
@pytest.fixture
def proxyserver(mocker):
mocker.patch("twisted.test.iosim.FakeTransport.startTLS")
mocker.patch("pappyproxy.proxy.load_certs_from_dir", new=mock_generate_cert)
factory = ProxyServerFactory()
protocol = factory.buildProtocol(('127.0.0.1', 0))
protocol.makeConnection(FakeTransport(protocol, True))
protocol.lineReceived('CONNECT https://www.AAAA.BBBB:443 HTTP/1.1')
protocol.lineReceived('')
protocol.transport.getOutBuffer()
return protocol
@pytest.fixture
def proxy_connection():
@defer.inlineCallbacks
def gen_connection(send_data, new_req=False, new_rsp=False,
drop_req=False, drop_rsp=False):
factory = ProxyClientFactory(http.Request(send_data))
macro = gen_mangle_macro(new_req, new_rsp, drop_req, drop_rsp)
factory.intercepting_macros['pappy_mangle'] = macro
protocol = factory.buildProtocol(None)
tr = FakeTransport(protocol, True)
protocol.makeConnection(tr)
sent = yield protocol.data_defer
print sent
defer.returnValue((protocol, sent, factory.data_defer))
return gen_connection
@pytest.fixture
def in_scope_true(mocker):
new_in_scope = mock.MagicMock()
new_in_scope.return_value = True
mocker.patch("pappyproxy.context.in_scope", new=new_in_scope)
return new_in_scope
@pytest.fixture
def in_scope_false(mocker):
new_in_scope = mock.MagicMock()
new_in_scope.return_value = False
mocker.patch("pappyproxy.context.in_scope", new=new_in_scope)
return new_in_scope
## Autorun fixtures
@pytest.fixture(autouse=True)
def ignore_save(mocker):
mocker.patch("pappyproxy.http.Request.async_deep_save", func_ignored_deferred)
####################
## Mock functions
def mock_generate_cert(cert_dir):
private_key = ('-----BEGIN PRIVATE KEY-----\n'
'MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDAoClrYUEB7lM0\n'
'zQaKkXZVG2d1Bu9hV8urpx0gNXMbyZ2m3xb+sKZju/FHPuWenA4KaN5gRUT+oLfv\n'
'tnF6Ia0jpRNWnX0Fyn/irdg1BWGJn7k7mJ2D0NXZQczn2+xxY05599NfGWqNKCYy\n'
'jhSwPsUK+sGJqi7aSDdlS97ZTjrQVTTFsC0+kSu4lS5fsWXxqrKLa6Ao8W7abVRO\n'
'JHazh/cxM4UKpgWU+E6yD4o4ZgHY+SMTVUh/IOM8DuOVyLEWtx4oLNiLMlpWT3qy\n'
'4IMpOF6VuU6JF2HGV13SoJfhsLXsPRbLVTAnZvJcZwtgDm6NfKapU8W8olkDV1Bf\n'
'YQEMSNX7AgMBAAECggEBAII0wUrAdrzjaIMsg9tu8FofKBPHGFDok9f4Iov/FUwX\n'
'QOXnrxeCOTb5d+L89SH9ws/ui0LwD+8+nJcA8DvqP6r0jtnhov0jIMcNVDSi6oeo\n'
'3AEY7ICJzcQJ4oRn+K+8vPNdPhfuikPYe9l4iSuJgpAlaGWyD/GlFyz12DFz2/Wu\n'
'NIcqR1ucvezRHn3eGMtvDv2WGaN4ifUc30k8XgSUesmwSI6beb5+hxq7wXfsurnP\n'
'EUrPY9ts3lfiAgxzTKOuj1VR5hn7cJyLN8jF0mZs4D6eSSHorIddhmaNiCq5ZbMd\n'
'QdlDiPvnXHT41OoXOb7tDEt7SGoiRh2noCZ1aZiSziECgYEA+tuPPLYWU6JRB6EW\n'
'PhbcXQbh3vML7eT1q7DOz0jYCojgT2+k7EWSI8T830oQyjbpe3Z86XEgH7UBjUgq\n'
'27nJ4E6dQDYGbYCKEklOoCGLE7A60i1feIz8otOQRrbQ4jcpibEgscA6gzHmunYf\n'
'De5euUgYW+Rq2Vmr6/NzUaUgui8CgYEAxJMDwPOGgiLM1cczlaSIU9Obz+cVnwWn\n'
'nsdKYMto2V3yKLydDfjsgOgzxHOxxy+5L645TPxK6CkiISuhJ93kAFFtx+1sCBCT\n'
'tVzY5robVAekxA9tlPIxtsn3+/axx3n6HnV0oA/XtxkuOS5JImgEdXqFwJZkerGE\n'
'waftIU2FCfUCgYEArl8+ErJzlJEIiCgWIPSdGuD00pfZW/TCPCT7rKRy3+fDHBR7\n'
'7Gxzp/9+0utV/mnrJBH5w/8JmGCmgoF+oRtk01FyBzdGgolN8GYajD6kwPvH917o\n'
'tRAzcC9lY3IigoxbiEWid0wqoBVoz4XaEkH2gA44OG/vQcQOOEYSi9cfh6sCgYBg\n'
'KLaOXdJvuIxRCzgNvMW/k+VFh3pJJx//COg2f2qT4mQCT3nYiutOh8hDEoFluc+y\n'
'Jlz7bvNJrE14wnn8IYxWJ383bMoLC+jlsDyeaW3S5kZQbmehk/SDwTrg86W1udKD\n'
'sdtSLU3N0LCO4jh+bzm3Ki9hrXALoOkbPoU+ZEhvPQKBgQDf79XQ3RNxZSk+eFyq\n'
'qD8ytVqxEoD+smPDflXXseVH6o+pNWrF8+A0KqmO8c+8KVzWj/OfULO6UbKd3E+x\n'
'4JGkWu9yF1lEgtHgibF2ER8zCSIL4ikOEasPCkrKj5SrS4Q+j4u5ha76dIc2CVu1\n'
'hkX2PQ1xU4ocu06k373sf73A4Q==\n'
'-----END PRIVATE KEY-----')
ca_key = ('-----BEGIN CERTIFICATE-----\n'
'MIIDjzCCAncCFQCjC8r+I4xa7JoGUJYGOTcqDROA0DANBgkqhkiG9w0BAQsFADBg\n'
'MQswCQYDVQQGEwJVUzERMA8GA1UECBMITWljaGlnYW4xEjAQBgNVBAcTCUFubiBB\n'
'cmJvcjEUMBIGA1UEChMLUGFwcHkgUHJveHkxFDASBgNVBAMTC1BhcHB5IFByb3h5\n'
'MB4XDTE1MTEyMDIxMTEzOVoXDTI1MTExNzIxMTEzOVowYDELMAkGA1UEBhMCVVMx\n'
'ETAPBgNVBAgTCE1pY2hpZ2FuMRIwEAYDVQQHEwlBbm4gQXJib3IxFDASBgNVBAoT\n'
'C1BhcHB5IFByb3h5MRQwEgYDVQQDEwtQYXBweSBQcm94eTCCASIwDQYJKoZIhvcN\n'
'AQEBBQADggEPADCCAQoCggEBAMCgKWthQQHuUzTNBoqRdlUbZ3UG72FXy6unHSA1\n'
'cxvJnabfFv6wpmO78Uc+5Z6cDgpo3mBFRP6gt++2cXohrSOlE1adfQXKf+Kt2DUF\n'
'YYmfuTuYnYPQ1dlBzOfb7HFjTnn3018Zao0oJjKOFLA+xQr6wYmqLtpIN2VL3tlO\n'
'OtBVNMWwLT6RK7iVLl+xZfGqsotroCjxbtptVE4kdrOH9zEzhQqmBZT4TrIPijhm\n'
'Adj5IxNVSH8g4zwO45XIsRa3Higs2IsyWlZPerLggyk4XpW5TokXYcZXXdKgl+Gw\n'
'tew9FstVMCdm8lxnC2AObo18pqlTxbyiWQNXUF9hAQxI1fsCAwEAAaNFMEMwEgYD\n'
'VR0TAQH/BAgwBgEB/wIBADAOBgNVHQ8BAf8EBAMCAQYwHQYDVR0OBBYEFNo5o+5e\n'
'a0sNMlW/75VgGJCv2AcJMA0GCSqGSIb3DQEBCwUAA4IBAQBdJDhxbmoEe27bD8me\n'
'YTcLGjs/StKkSil7rLbX+tBCwtkm5UEEejBuAhKk2FuAXW8yR1FqKJSZwVCAocBT\n'
'Bo/+97Ee+h7ywrRFhATEr9D/TbbHKOjCjDzOMl9yLZa2DKErZjbI30ZD6NafWS/X\n'
'hx5X1cGohHcVVzT4jIgUEU70vvYfNn8CTZm4oJ7qqRe/uQPUYy0rwvbd60oprtGg\n'
'jNv1H5R4ODHUMBXAI9H7ft9cWrd0fBQjxhoj8pvgJXEZ52flXSqQc7qHLg1wO/zC\n'
'RUgpTcNAb2qCssBKbj+c1vKEPRUJfw6UYb0s1462rQNc8BgZiKaNbwokFmkAnjUg\n'
'AvnX\n'
'-----END CERTIFICATE-----')
return (ca_key, private_key)
def gen_mangle_macro(modified_req=None, modified_rsp=None,
drop_req=False, drop_rsp=False):
macro = mock.MagicMock()
if modified_req or drop_req:
macro.async_req = True
macro.do_req = True
if drop_req:
newreq = None
else:
newreq = http.Request(modified_req)
macro.async_mangle_request.return_value = mock_deferred(newreq)
else:
macro.do_req = False
if modified_rsp or drop_rsp:
macro.async_rsp = True
macro.do_rsp = True
if drop_rsp:
newrsp = None
else:
newrsp = http.Response(modified_rsp)
macro.async_mangle_response.return_value = mock_deferred(newrsp)
else:
macro.do_rsp = False
return macro
def notouch_mangle_req(request):
d = mock_deferred(request)
return d
def notouch_mangle_rsp(request):
d = mock_deferred(request.response)
return d
def req_mangler_change(request):
req = http.Request('GET /mangled HTTP/1.1\r\n\r\n')
d = mock_deferred(req)
return d
def rsp_mangler_change(request):
rsp = http.Response('HTTP/1.1 500 MANGLED\r\n\r\n')
d = mock_deferred(rsp)
return d
def req_mangler_drop(request):
return mock_deferred(None)
def rsp_mangler_drop(request):
return mock_deferred(None)
####################
## Unit test tests
def test_proxy_server_fixture(unconnected_proxyserver):
unconnected_proxyserver.transport.write('hello')
assert unconnected_proxyserver.transport.getOutBuffer() == 'hello'
@pytest.inlineCallbacks
def test_mock_deferreds():
d = mock_deferred('Hello!')
r = yield d
assert r == 'Hello!'
def test_deleted():
with pytest.raises(NotImplementedError):
reactor.connectTCP("www.google.com", "80", ServerFactory)
with pytest.raises(NotImplementedError):
reactor.connectSSL("www.google.com", "80", ServerFactory)
####################
## Proxy Server Tests
def test_proxy_server_connect(unconnected_proxyserver, mocker, in_scope_true):
mocker.patch("twisted.internet.reactor.connectSSL")
unconnected_proxyserver.lineReceived('CONNECT https://www.dddddd.fff:433 HTTP/1.1')
unconnected_proxyserver.lineReceived('')
assert unconnected_proxyserver.transport.getOutBuffer() == 'HTTP/1.1 200 Connection established\r\n\r\n'
assert unconnected_proxyserver._request_obj.is_ssl
def test_proxy_server_basic(proxyserver, mocker, in_scope_true):
mocker.patch("twisted.internet.reactor.connectSSL")
mocker.patch('pappyproxy.proxy.ProxyServer.setRawMode')
proxyserver.lineReceived('GET / HTTP/1.1')
proxyserver.lineReceived('')
assert proxyserver.setRawMode.called
args, kwargs = twisted.internet.reactor.connectSSL.call_args
assert args[0] == 'www.AAAA.BBBB'
assert args[1] == 443
@pytest.inlineCallbacks
def test_proxy_client_nomangle(mocker, proxy_connection, in_scope_true):
# Make the connection
(prot, sent, retreq_deferred) = \
yield proxy_connection('GET / HTTP/1.1\r\n\r\n', None, None)
assert sent.full_request == 'GET / HTTP/1.1\r\n\r\n'
prot.lineReceived('HTTP/1.1 200 OK')
prot.lineReceived('Content-Length: 0')
prot.lineReceived('')
ret_req = yield retreq_deferred
response = ret_req.response.full_response
assert response == 'HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n'
@pytest.inlineCallbacks
def test_proxy_client_mangle_req(mocker, proxy_connection, in_scope_true):
# Make the connection
(prot, sent, retreq_deferred) = \
yield proxy_connection('GET / HTTP/1.1\r\n\r\n', MANGLED_REQ, None)
assert sent.full_request == 'GET /mangled HTTP/1.1\r\n\r\n'
@pytest.inlineCallbacks
def test_proxy_client_mangle_rsp(mocker, proxy_connection, in_scope_true):
# Make the connection
(prot, sent, retreq_deferred) = \
yield proxy_connection('GET / HTTP/1.1\r\n\r\n', None, MANGLED_RSP)
prot.lineReceived('HTTP/1.1 200 OK')
prot.lineReceived('Content-Length: 0')
prot.lineReceived('')
req = yield retreq_deferred
response = req.response.full_response
assert response == 'HTTP/1.1 500 MANGLED\r\n\r\n'
@pytest.inlineCallbacks
def test_proxy_drop_req(mocker, proxy_connection, in_scope_true):
(prot, sent, retreq_deferred) = \
yield proxy_connection('GET / HTTP/1.1\r\n\r\n', None, None, True, False)
assert sent is None
@pytest.inlineCallbacks
def test_proxy_drop_rsp(mocker, proxy_connection, in_scope_true):
(prot, sent, retreq_deferred) = \
yield proxy_connection('GET / HTTP/1.1\r\n\r\n', None, None, False, True)
prot.lineReceived('HTTP/1.1 200 OK')
prot.lineReceived('Content-Length: 0')
prot.lineReceived('')
retreq = yield retreq_deferred
assert retreq.response is None
@pytest.inlineCallbacks
def test_proxy_client_360_noscope(mocker, proxy_connection, in_scope_false):
# Make the connection
(prot, sent, retreq_deferred) = yield proxy_connection('GET / HTTP/1.1\r\n\r\n')
assert sent.full_request == 'GET / HTTP/1.1\r\n\r\n'
prot.lineReceived('HTTP/1.1 200 OK')
prot.lineReceived('Content-Length: 0')
prot.lineReceived('')
req = yield retreq_deferred
assert req.response.full_response == 'HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n'