Version 0.2.14

master
Rob Glew 8 years ago
parent 76d20774a5
commit cbc0b4be4c
  1. 2
      README.md
  2. 2
      pappyproxy/__init__.py
  3. 7
      pappyproxy/http.py
  4. 67
      pappyproxy/macros.py
  5. 64
      pappyproxy/proxy.py
  6. 421
      pappyproxy/tests/test_http_proxy.py
  7. 770
      pappyproxy/tests/test_proxy.py
  8. 78
      pappyproxy/tests/testutil.py

@ -1455,6 +1455,8 @@ Changelog
--------- ---------
The boring part of the readme The boring part of the readme
* 0.2.14
* Critical bugfixes
* 0.2.13 * 0.2.13
* Refactor proxy core * Refactor proxy core
* WEBSOCKETS * WEBSOCKETS

@ -1 +1 @@
__version__ = '0.2.13' __version__ = '0.2.14'

@ -3387,7 +3387,12 @@ class HTTPProtocolProxy(ProtocolProxy):
self.conn_is_ssl != use_ssl: self.conn_is_ssl != use_ssl:
self.log("Closing connection to old server") self.log("Closing connection to old server")
self.close_server_connection() self.close_server_connection()
self.connect(host, port, use_ssl, use_socks=use_socks) # we don't use SSL because maybe_use_ssl takes care of setting
# it up if we end up using it
if self.conn_is_maybe_ssl:
self.connect(host, port, False, use_socks=use_socks)
else:
self.connect(host, port, use_ssl, use_socks=use_socks)
def server_connection_lost(self, reason): def server_connection_lost(self, reason):
self.log("Connection to server lost: %s" % str(reason)) self.log("Connection to server lost: %s" % str(reason))

@ -436,6 +436,73 @@ class MacroTemplate(object):
def template_argstring(cls, template): def template_argstring(cls, template):
return cls._template_data[template][2] return cls._template_data[template][2]
## Other functions
@defer.inlineCallbacks
def async_mangle_ws(self, request, message):
if hasattr(self.source, 'async_mangle_ws'):
mangled_ws = yield self.source.async_mangle_ws(request, message)
defer.returnValue(mangled_ws)
defer.returnValue(message)
class MacroTemplate(object):
_template_data = {
'macro': ('macro.py.template',
'Generic macro template',
'[reqids]',
'macro_{fname}.py',
gen_template_args_macro),
'intmacro': ('intmacro.py.template',
'Generic intercepting macro template',
'',
'int_{fname}.py',
gen_template_generator_noargs('intmacro')),
'modheader': ('macro_header.py.template',
'Modify a header in the request and the response if it exists.',
'',
'int_{fname}.py',
gen_template_generator_noargs('modheader')),
'resubmit': ('macro_resubmit.py.template',
'Resubmit all in-context requests',
'',
'macro_{fname}.py',
gen_template_generator_noargs('resubmit')),
}
@classmethod
def fill_template(cls, template, subs):
loader = FileSystemLoader(session.config.pappy_dir+'/templates')
env = Environment(loader=loader)
template = env.get_template(cls._template_data[template][0])
return template.render(zip=zip, **subs)
@classmethod
@defer.inlineCallbacks
def fill_template_args(cls, template, args=[]):
ret = cls._template_data[template][4](args)
if isinstance(ret, defer.Deferred):
ret = yield ret
defer.returnValue(ret)
@classmethod
def template_filename(cls, template, fname):
return cls._template_data[template][3].format(fname=fname)
@classmethod
def template_list(cls):
return [k for k, v in cls._template_data.iteritems()]
@classmethod
def template_description(cls, template):
return cls._template_data[template][1]
@classmethod
def template_argstring(cls, template):
return cls._template_data[template][2]
## Other functions ## Other functions
def load_macros(loc): def load_macros(loc):

@ -307,17 +307,20 @@ class ProtocolProxy(object):
self.client_connected = False self.client_connected = False
self.client_buffer = '' self.client_buffer = ''
self.client_start_tls = False self.client_start_tls = False
self.client_tls_host = ''
self.client_protocol = None self.client_protocol = None
self.client_do_maybe_tls = False
self.server_transport = None self.server_transport = None
self.server_connected = False self.server_connected = False
self.server_buffer = '' self.server_buffer = ''
self.server_start_tls = False self.server_start_tls = False
self.conn_is_maybe_ssl = False
self.server_protocol = None self.server_protocol = None
self.conn_host = None self.conn_host = None
self.conn_port = None self.conn_port = None
self.conn_is_ssl = None self.conn_is_ssl = False
self.connection_id = get_next_connection_id() self.connection_id = get_next_connection_id()
def log(self, message, symbol='*', verbosity_level=3): def log(self, message, symbol='*', verbosity_level=3):
@ -330,23 +333,37 @@ class ProtocolProxy(object):
from pappyproxy.pappy import session from pappyproxy.pappy import session
self.connecting = True self.connecting = True
self.log("Connecting to %s:%d ssl=%s" % (host, port, use_ssl))
connect_with_ssl = use_ssl
if self.conn_is_maybe_ssl:
connect_with_ssl = False
self.log("Connecting to %s:%d ssl=%s (maybe_ssl=%s)" % (host, port, connect_with_ssl, self.conn_is_maybe_ssl))
factory = PassthroughProtocolFactory(self.server_data_received, factory = PassthroughProtocolFactory(self.server_data_received,
self.server_connection_made, self.server_connection_made,
self.server_connection_lost) self.server_connection_lost)
self.conn_host = host self.conn_host = host
self.conn_port = port self.conn_port = port
self.conn_is_ssl = use_ssl if self.conn_is_maybe_ssl:
self.conn_is_ssl = False
else:
self.conn_is_ssl = use_ssl
if use_socks: if use_socks:
socks_config = session.config.socks_proxy socks_config = session.config.socks_proxy
else: else:
socks_config = None socks_config = None
make_proxied_connection(factory, host, port, use_ssl, socks_config=socks_config, make_proxied_connection(factory, host, port, connect_with_ssl, socks_config=socks_config,
log_id=self.connection_id, http_error_transport=self.client_transport) log_id=self.connection_id, http_error_transport=self.client_transport)
## Client interactions ## Client interactions
def client_data_received(self, data):
"""
Implemented by child class
"""
pass
def send_client_data(self, data): def send_client_data(self, data):
self.log("pc< %s" % short_data(data)) self.log("pc< %s" % short_data(data))
if self.client_connected: if self.client_connected:
@ -355,12 +372,17 @@ class ProtocolProxy(object):
self.client_buffer += data self.client_buffer += data
def client_connection_made(self, protocol): def client_connection_made(self, protocol):
self.log("Client connection made")
self.client_protocol = protocol
self.client_transport = self.client_protocol.transport self.client_transport = self.client_protocol.transport
self.client_connected = True self.client_connected = True
self.connecting = False self.connecting = False
if self.client_start_tls: if self.client_start_tls:
self.start_client_tls() if self.client_do_maybe_tls:
self.start_client_maybe_tls(self.client_tls_host)
else:
self.start_client_tls(self.client_tls_host)
if self.client_buffer != '': if self.client_buffer != '':
self.client_transport.write(self.client_buffer) self.client_transport.write(self.client_buffer)
self.client_buffer = '' self.client_buffer = ''
@ -377,16 +399,25 @@ class ProtocolProxy(object):
def start_server_tls(self): def start_server_tls(self):
if self.server_connected: if self.server_connected:
self.log("Starting TLS on server transport") self.log("Starting TLS on server transport")
self.conn_is_ssl = True
self.server_transport.startTLS(ssl.ClientContextFactory()) self.server_transport.startTLS(ssl.ClientContextFactory())
else: else:
self.log("Server not yet connected, will start TLS on connect") self.log("Server not yet connected, will start TLS on connect")
self.start_server_tls = True self.server_start_tls = True
def start_client_maybe_tls(self, cert_host): def start_client_maybe_tls(self, cert_host):
ctx = generate_tls_context(cert_host) ctx = generate_tls_context(cert_host)
start_maybe_tls(self.client_transport, if self.client_connected:
tls_host=cert_host, self.log("Starting maybe TLS on client transport")
start_tls_callback=self.start_server_tls) self.conn_is_maybe_ssl = True
start_maybe_tls(self.client_transport,
tls_host=cert_host,
start_tls_callback=self.start_server_tls)
else:
self.log("Client not yet connected, will start maybe TLS on connect")
self.client_do_maybe_tls = True
self.client_start_tls = True
self.client_tls_host = cert_host
def start_client_tls(self, cert_host): def start_client_tls(self, cert_host):
if self.client_connected: if self.client_connected:
@ -395,10 +426,17 @@ class ProtocolProxy(object):
self.client_transport.startTLS(ctx) self.client_transport.startTLS(ctx)
else: else:
self.log("Client not yet connected, will start TLS on connect") self.log("Client not yet connected, will start TLS on connect")
self.start_client_tls = True self.client_start_tls = True
self.client_tls_host = cert_host
## Server interactions ## Server interactions
def server_data_received(self, data):
"""
Implemented by child class
"""
pass
def send_server_data(self, data): def send_server_data(self, data):
if self.server_connected: if self.server_connected:
self.log("ps> %s" % short_data(data)) self.log("ps> %s" % short_data(data))
@ -412,7 +450,7 @@ class ProtocolProxy(object):
""" """
self.server_protocol must be set before calling self.server_protocol must be set before calling
""" """
self.log("Connection made") self.log("Server connection made")
self.server_protocol = protocol self.server_protocol = protocol
self.server_transport = protocol.transport self.server_transport = protocol.transport
self.server_connected = True self.server_connected = True
@ -441,6 +479,7 @@ class ProtocolProxy(object):
self.server_transport = None self.server_transport = None
self.server_connected = False self.server_connected = False
self.server_buffer = '' self.server_buffer = ''
self.server_start_tls = False
self.server_protocol = None self.server_protocol = None
def close_client_connection(self): def close_client_connection(self):
@ -450,7 +489,10 @@ class ProtocolProxy(object):
self.client_transport = None self.client_transport = None
self.client_connected = False self.client_connected = False
self.client_buffer = '' self.client_buffer = ''
self.client_start_tls = False
self.client_tls_host = ''
self.client_protocol = None self.client_protocol = None
self.client_do_maybe_tls = False
def close_connections(self): def close_connections(self):
self.close_server_connection() self.close_server_connection()

@ -0,0 +1,421 @@
import pytest
import mock
import random
import datetime
import pappyproxy
import base64
import collections
from pappyproxy import http
from pappyproxy.proxy import ProxyClientFactory, ProxyServerFactory, UpstreamHTTPProxyClient
from pappyproxy.http import Request, Response
from pappyproxy.macros import InterceptMacro
from testutil import mock_deferred, func_deleted, TLSStringTransport, freeze, mock_int_macro, no_tcp
from twisted.internet import defer
class InterceptMacroTest(InterceptMacro):
def __init__(self, new_req=None, new_rsp=None):
InterceptMacro.__init__(self)
self.new_req = None
self.new_rsp = None
if new_req:
self.intercept_requests = True
self.new_req = new_req
if new_rsp:
self.intercept_responses = True
self.new_rsp = new_rsp
def mangle_request(self, request):
if self.intercept_requests:
return self.new_req
else:
return request
def mangle_response(self, request):
if self.intercept_responses:
return self.new_rsp
else:
return request.response
class TestProxyConnection(object):
@property
def client_protocol(self):
if 'protocol' not in self.conn_info:
raise Exception('Connection to server not made. Cannot write data as server.')
return self.conn_info['protocol']
@property
def client_factory(self):
if 'protocol' not in self.conn_info:
raise Exception('Connection to server not made. Cannot write data as server.')
return self.conn_info['factory']
def setUp(self, mocker, int_macros={}, socks_config=None, http_config=None, in_scope=True):
self.mocker = mocker
self.conn_info = {}
# Mock config
self.mock_config = pappyproxy.config.PappyConfig()
self.mock_config.socks_proxy = socks_config
self.mock_config.http_proxy = http_config
self.mock_session = pappyproxy.pappy.PappySession(self.mock_config)
mocker.patch.object(pappyproxy.pappy, 'session', new=self.mock_session)
mocker.patch("pappyproxy.proxy.load_certs_from_dir", new=mock_generate_cert)
# Listening server
## self.server_factory = ProxyServerFactory()
## self.server_factory.save_all = True
## self.server_factory.intercepting_macros = int_macros
## self.server_protocol = self.server_factory.buildProtocol(('127.0.0.1', 0))
## self.server_transport = TLSStringTransport()
## self.server_protocol.makeConnection(self.server_transport)
# Other mocks
self.req_save = mocker.patch.object(pappyproxy.http.Request, 'async_deep_save', autospec=True, side_effect=mock_req_async_save)
self.submit_request = mocker.patch('pappyproxy.http.Request.submit_request',
new=self.gen_mock_submit_request())
self.make_proxied_connection = mocker.patch('pappyproxy.proxy.make_proxied_connection')
self.in_scope = mocker.patch('pappyproxy.context.in_scope').return_value = in_scope
def gen_mock_submit_request(self):
orig = Request.submit_request
def f(request, save_request=False, intercepting_macros={}, stream_transport=None):
return orig(request, save_request=save_request,
intercepting_macros=intercepting_macros,
stream_transport=stream_transport,
_factory_string_transport=True,
_conn_info=self.conn_info)
return f
def perform_connect_request(self):
self.write_as_browser('CONNECT https://www.AAAA.BBBB:443 HTTP/1.1\r\n\r\n')
assert self.read_as_browser() == 'HTTP/1.1 200 Connection established\r\n\r\n'
def write_as_browser(self, data):
self.server_protocol.dataReceived(data)
def read_as_browser(self):
s = self.server_protocol.transport.value()
self.server_protocol.transport.clear()
return s
def write_as_server(self, data):
self.client_protocol.dataReceived(data)
def read_as_server(self):
s = self.client_protocol.transport.value()
self.client_protocol.transport.clear()
return s
def mock_req_async_save(req):
req.reqid = str(random.randint(1,1000000))
return mock_deferred()
def mock_mangle_response_side_effect(new_rsp):
def f(request, mangle_macros):
request.response = new_rsp
return mock_deferred(True)
return f
def mock_generate_cert(cert_dir):
private_key = ('-----BEGIN PRIVATE KEY-----\n'
'MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDAoClrYUEB7lM0\n'
'zQaKkXZVG2d1Bu9hV8urpx0gNXMbyZ2m3xb+sKZju/FHPuWenA4KaN5gRUT+oLfv\n'
'tnF6Ia0jpRNWnX0Fyn/irdg1BWGJn7k7mJ2D0NXZQczn2+xxY05599NfGWqNKCYy\n'
'jhSwPsUK+sGJqi7aSDdlS97ZTjrQVTTFsC0+kSu4lS5fsWXxqrKLa6Ao8W7abVRO\n'
'JHazh/cxM4UKpgWU+E6yD4o4ZgHY+SMTVUh/IOM8DuOVyLEWtx4oLNiLMlpWT3qy\n'
'4IMpOF6VuU6JF2HGV13SoJfhsLXsPRbLVTAnZvJcZwtgDm6NfKapU8W8olkDV1Bf\n'
'YQEMSNX7AgMBAAECggEBAII0wUrAdrzjaIMsg9tu8FofKBPHGFDok9f4Iov/FUwX\n'
'QOXnrxeCOTb5d+L89SH9ws/ui0LwD+8+nJcA8DvqP6r0jtnhov0jIMcNVDSi6oeo\n'
'3AEY7ICJzcQJ4oRn+K+8vPNdPhfuikPYe9l4iSuJgpAlaGWyD/GlFyz12DFz2/Wu\n'
'NIcqR1ucvezRHn3eGMtvDv2WGaN4ifUc30k8XgSUesmwSI6beb5+hxq7wXfsurnP\n'
'EUrPY9ts3lfiAgxzTKOuj1VR5hn7cJyLN8jF0mZs4D6eSSHorIddhmaNiCq5ZbMd\n'
'QdlDiPvnXHT41OoXOb7tDEt7SGoiRh2noCZ1aZiSziECgYEA+tuPPLYWU6JRB6EW\n'
'PhbcXQbh3vML7eT1q7DOz0jYCojgT2+k7EWSI8T830oQyjbpe3Z86XEgH7UBjUgq\n'
'27nJ4E6dQDYGbYCKEklOoCGLE7A60i1feIz8otOQRrbQ4jcpibEgscA6gzHmunYf\n'
'De5euUgYW+Rq2Vmr6/NzUaUgui8CgYEAxJMDwPOGgiLM1cczlaSIU9Obz+cVnwWn\n'
'nsdKYMto2V3yKLydDfjsgOgzxHOxxy+5L645TPxK6CkiISuhJ93kAFFtx+1sCBCT\n'
'tVzY5robVAekxA9tlPIxtsn3+/axx3n6HnV0oA/XtxkuOS5JImgEdXqFwJZkerGE\n'
'waftIU2FCfUCgYEArl8+ErJzlJEIiCgWIPSdGuD00pfZW/TCPCT7rKRy3+fDHBR7\n'
'7Gxzp/9+0utV/mnrJBH5w/8JmGCmgoF+oRtk01FyBzdGgolN8GYajD6kwPvH917o\n'
'tRAzcC9lY3IigoxbiEWid0wqoBVoz4XaEkH2gA44OG/vQcQOOEYSi9cfh6sCgYBg\n'
'KLaOXdJvuIxRCzgNvMW/k+VFh3pJJx//COg2f2qT4mQCT3nYiutOh8hDEoFluc+y\n'
'Jlz7bvNJrE14wnn8IYxWJ383bMoLC+jlsDyeaW3S5kZQbmehk/SDwTrg86W1udKD\n'
'sdtSLU3N0LCO4jh+bzm3Ki9hrXALoOkbPoU+ZEhvPQKBgQDf79XQ3RNxZSk+eFyq\n'
'qD8ytVqxEoD+smPDflXXseVH6o+pNWrF8+A0KqmO8c+8KVzWj/OfULO6UbKd3E+x\n'
'4JGkWu9yF1lEgtHgibF2ER8zCSIL4ikOEasPCkrKj5SrS4Q+j4u5ha76dIc2CVu1\n'
'hkX2PQ1xU4ocu06k373sf73A4Q==\n'
'-----END PRIVATE KEY-----')
ca_key = ('-----BEGIN CERTIFICATE-----\n'
'MIIDjzCCAncCFQCjC8r+I4xa7JoGUJYGOTcqDROA0DANBgkqhkiG9w0BAQsFADBg\n'
'MQswCQYDVQQGEwJVUzERMA8GA1UECBMITWljaGlnYW4xEjAQBgNVBAcTCUFubiBB\n'
'cmJvcjEUMBIGA1UEChMLUGFwcHkgUHJveHkxFDASBgNVBAMTC1BhcHB5IFByb3h5\n'
'MB4XDTE1MTEyMDIxMTEzOVoXDTI1MTExNzIxMTEzOVowYDELMAkGA1UEBhMCVVMx\n'
'ETAPBgNVBAgTCE1pY2hpZ2FuMRIwEAYDVQQHEwlBbm4gQXJib3IxFDASBgNVBAoT\n'
'C1BhcHB5IFByb3h5MRQwEgYDVQQDEwtQYXBweSBQcm94eTCCASIwDQYJKoZIhvcN\n'
'AQEBBQADggEPADCCAQoCggEBAMCgKWthQQHuUzTNBoqRdlUbZ3UG72FXy6unHSA1\n'
'cxvJnabfFv6wpmO78Uc+5Z6cDgpo3mBFRP6gt++2cXohrSOlE1adfQXKf+Kt2DUF\n'
'YYmfuTuYnYPQ1dlBzOfb7HFjTnn3018Zao0oJjKOFLA+xQr6wYmqLtpIN2VL3tlO\n'
'OtBVNMWwLT6RK7iVLl+xZfGqsotroCjxbtptVE4kdrOH9zEzhQqmBZT4TrIPijhm\n'
'Adj5IxNVSH8g4zwO45XIsRa3Higs2IsyWlZPerLggyk4XpW5TokXYcZXXdKgl+Gw\n'
'tew9FstVMCdm8lxnC2AObo18pqlTxbyiWQNXUF9hAQxI1fsCAwEAAaNFMEMwEgYD\n'
'VR0TAQH/BAgwBgEB/wIBADAOBgNVHQ8BAf8EBAMCAQYwHQYDVR0OBBYEFNo5o+5e\n'
'a0sNMlW/75VgGJCv2AcJMA0GCSqGSIb3DQEBCwUAA4IBAQBdJDhxbmoEe27bD8me\n'
'YTcLGjs/StKkSil7rLbX+tBCwtkm5UEEejBuAhKk2FuAXW8yR1FqKJSZwVCAocBT\n'
'Bo/+97Ee+h7ywrRFhATEr9D/TbbHKOjCjDzOMl9yLZa2DKErZjbI30ZD6NafWS/X\n'
'hx5X1cGohHcVVzT4jIgUEU70vvYfNn8CTZm4oJ7qqRe/uQPUYy0rwvbd60oprtGg\n'
'jNv1H5R4ODHUMBXAI9H7ft9cWrd0fBQjxhoj8pvgJXEZ52flXSqQc7qHLg1wO/zC\n'
'RUgpTcNAb2qCssBKbj+c1vKEPRUJfw6UYb0s1462rQNc8BgZiKaNbwokFmkAnjUg\n'
'AvnX\n'
'-----END CERTIFICATE-----')
return (ca_key, private_key)
########
## Tests
def test_no_tcp():
from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint
from txsocksx.client import SOCKS5ClientEndpoint
from txsocksx.tls import TLSWrapClientEndpoint
with pytest.raises(NotImplementedError):
SSL4ClientEndpoint('aasdfasdf.sdfwerqwer')
with pytest.raises(NotImplementedError):
TCP4ClientEndpoint('aasdfasdf.sdfwerqwer')
with pytest.raises(NotImplementedError):
SOCKS5ClientEndpoint('aasdfasdf.sdfwerqwer')
with pytest.raises(NotImplementedError):
TLSWrapClientEndpoint('asdf.2341')
def test_proxy_server_connect(mocker):
proxy = TestProxyConnection()
proxy.setUp(mocker)
proxy.write_as_browser('CONNECT https://www.AAAA.BBBB:443 HTTP/1.1\r\n\r\n')
rsp = proxy.read_as_browser()
print rsp
assert rsp == 'HTTP/1.1 200 Connection established\r\n\r\n'
def test_proxy_server_forward_basic(mocker):
proxy = TestProxyConnection()
proxy.setUp(mocker)
req_contents = ('POST /fooo HTTP/1.1\r\n'
'Test-Header: foo\r\n'
'Content-Length: 4\r\n'
'Host: www.AAAA.BBBB\r\n'
'\r\n'
'ABCD')
rsp_contents = ('HTTP/1.1 200 OK\r\n\r\n')
proxy.write_as_browser(req_contents)
assert proxy.read_as_server() == req_contents
proxy.write_as_server(rsp_contents)
assert proxy.read_as_browser() == rsp_contents
proxy.get_endpoint.assert_called_with('www.AAAA.BBBB', 80, False, socks_config=None, use_http_proxy=True)
assert proxy.req_save.called
def test_proxy_server_forward_basic_ssl(mocker):
proxy = TestProxyConnection()
proxy.setUp(mocker)
proxy.perform_connect_request()
req_contents = ('POST /fooo HTTP/1.1\r\n'
'Test-Header: foo\r\n'
'Content-Length: 4\r\n'
'\r\n'
'ABCD')
rsp_contents = ('HTTP/1.1 200 OK\r\n\r\n')
proxy.write_as_browser(req_contents)
assert proxy.read_as_server() == req_contents
proxy.write_as_server(rsp_contents)
assert proxy.read_as_browser() == rsp_contents
assert proxy.req_save.called
proxy.get_endpoint.assert_called_with('www.AAAA.BBBB', 443, True, socks_config=None, use_http_proxy=True)
def test_proxy_server_connect_uri(mocker):
proxy = TestProxyConnection()
proxy.setUp(mocker)
proxy.write_as_browser('CONNECT https://www.AAAA.BBBB:443 HTTP/1.1\r\n\r\n')
proxy.read_as_browser()
req_contents = ('POST /fooo HTTP/1.1\r\n'
'Test-Header: foo\r\n'
'Content-Length: 4\r\n'
'\r\n'
'ABCD')
proxy.write_as_browser(req_contents)
assert proxy.client_protocol.transport.startTLS.called
assert proxy.client_factory.request.host == 'www.AAAA.BBBB'
assert proxy.client_factory.request.port == 443
assert proxy.client_factory.request.is_ssl == True
assert proxy.read_as_server() == req_contents
assert proxy.client_protocol.transport.startTLS.called
assert proxy.req_save.called
proxy.get_endpoint.assert_called_with('www.AAAA.BBBB', 443, True, socks_config=None, use_http_proxy=True)
def test_proxy_server_connect_uri_alt_port(mocker):
proxy = TestProxyConnection()
proxy.setUp(mocker)
proxy.write_as_browser('CONNECT https://www.AAAA.BBBB:80085 HTTP/1.1\r\n\r\n')
proxy.read_as_browser()
req_contents = ('POST /fooo HTTP/1.1\r\n'
'Test-Header: foo\r\n'
'Content-Length: 4\r\n'
'\r\n'
'ABCD')
proxy.write_as_browser(req_contents)
assert proxy.client_factory.request.host == 'www.AAAA.BBBB'
assert proxy.client_factory.request.port == 80085
assert proxy.client_factory.request.is_ssl == True
assert proxy.req_save.called
proxy.get_endpoint.assert_called_with('www.AAAA.BBBB', 80085, True, socks_config=None, use_http_proxy=True)
def test_proxy_server_socks_basic(mocker):
proxy = TestProxyConnection()
proxy.setUp(mocker, socks_config={'host': 'www.banana.faketld', 'port': 1337})
proxy.write_as_browser('CONNECT https://www.AAAA.BBBB:80085 HTTP/1.1\r\n\r\n')
proxy.read_as_browser()
req_contents = ('POST /fooo HTTP/1.1\r\n'
'Test-Header: foo\r\n'
'Content-Length: 4\r\n'
'\r\n'
'ABCD')
proxy.write_as_browser(req_contents)
proxy.get_endpoint.assert_called_with('www.AAAA.BBBB', 80085, True,
socks_config={'host':'www.banana.faketld', 'port':1337},
use_http_proxy=True)
def test_proxy_server_http_basic(mocker):
proxy = TestProxyConnection()
proxy.setUp(mocker, http_config={'host': 'www.banana.faketld', 'port': 1337})
proxy.write_as_browser('CONNECT https://www.AAAA.BBBB:80085 HTTP/1.1\r\n\r\n')
proxy.read_as_browser()
req_contents = ('POST /fooo HTTP/1.1\r\n'
'Test-Header: foo\r\n'
'Content-Length: 4\r\n'
'\r\n'
'ABCD')
proxy.write_as_browser(req_contents)
assert proxy.req_save.called
proxy.get_endpoint.assert_called_with('www.AAAA.BBBB', 80085, True,
socks_config=None,
use_http_proxy=True)
def test_proxy_server_360_noscope(mocker):
proxy = TestProxyConnection()
proxy.setUp(mocker, in_scope=False, socks_config={'host': 'www.banana.faketld', 'port': 1337})
proxy.write_as_browser('CONNECT https://www.AAAA.BBBB:80085 HTTP/1.1\r\n\r\n')
proxy.read_as_browser()
req_contents = ('POST /fooo HTTP/1.1\r\n'
'Test-Header: foo\r\n'
'Content-Length: 4\r\n'
'\r\n'
'ABCD')
proxy.write_as_browser(req_contents)
assert not proxy.req_save.called
proxy.get_endpoint.assert_called_with('www.AAAA.BBBB', 80085, True,
socks_config=None,
use_http_proxy=False)
def test_proxy_server_macro_simple(mocker):
proxy = TestProxyConnection()
new_req_contents = 'GET / HTTP/1.1\r\nMangled: Very yes\r\n\r\n'
new_rsp_contents = 'HTTP/1.1 200 OKILIE DOKILIE\r\nMangled: Very yes\r\n\r\n'
new_req = Request(new_req_contents)
new_rsp = Response(new_rsp_contents)
test_macro = InterceptMacroTest(new_req=new_req, new_rsp=new_rsp)
proxy.setUp(mocker, int_macros={'test_macro': test_macro})
proxy.write_as_browser('GET /serious.php HTTP/1.1\r\n\r\n')
assert proxy.read_as_server() == new_req_contents
proxy.write_as_server('HTTP/1.1 404 NOT FOUND\r\n\r\n')
assert proxy.read_as_browser() == new_rsp_contents
def test_proxy_server_macro_multiple(mocker):
proxy = TestProxyConnection()
new_req_contents1 = 'GET / HTTP/1.1\r\nMangled: Very yes\r\n\r\n'
new_rsp_contents1 = 'HTTP/1.1 200 OKILIE DOKILIE\r\nMangled: Very yes\r\n\r\n'
new_req1 = Request(new_req_contents1)
new_rsp1 = Response(new_rsp_contents1)
new_req_contents2 = 'GET / HTTP/1.1\r\nMangled: Very very yes\r\n\r\n'
new_rsp_contents2 = 'HTTP/1.1 200 OKILIE DOKILIE\r\nMangled: Very very yes\r\n\r\n'
new_req2 = Request(new_req_contents2)
new_rsp2 = Response(new_rsp_contents2)
test_macro1 = InterceptMacroTest(new_req=new_req1, new_rsp=new_rsp1)
test_macro2 = InterceptMacroTest(new_req=new_req2, new_rsp=new_rsp2)
macros = collections.OrderedDict()
macros['macro1'] = test_macro1
macros['macro2'] = test_macro2
proxy.setUp(mocker, int_macros=macros)
proxy.write_as_browser('GET /serious.php HTTP/1.1\r\n\r\n')
assert proxy.read_as_server() == new_req_contents2
proxy.write_as_server('HTTP/1.1 404 NOT FOUND\r\n\r\n')
assert proxy.read_as_browser() == new_rsp_contents2
def test_proxy_server_macro_360_noscope(mocker):
proxy = TestProxyConnection()
new_req_contents = 'GET / HTTP/1.1\r\nMangled: Very yes\r\n\r\n'
new_rsp_contents = 'HTTP/1.1 200 OKILIE DOKILIE\r\nMangled: Very yes\r\n\r\n'
new_req = Request(new_req_contents)
new_rsp = Response(new_rsp_contents)
test_macro = InterceptMacroTest(new_req=new_req, new_rsp=new_rsp)
proxy.setUp(mocker, int_macros={'test_macro': test_macro}, in_scope=False)
proxy.write_as_browser('GET /serious.php HTTP/1.1\r\n\r\n')
assert proxy.read_as_server() == 'GET /serious.php HTTP/1.1\r\n\r\n'
proxy.write_as_server('HTTP/1.1 404 NOT FOUND\r\n\r\n')
assert proxy.read_as_browser() == 'HTTP/1.1 404 NOT FOUND\r\n\r\n'
def test_proxy_server_stream_simple(mocker):
proxy = TestProxyConnection()
proxy.setUp(mocker)
req_contents = ('POST /fooo HTTP/1.1\r\n'
'Test-Header: foo\r\n'
'Content-Length: 4\r\n'
'Host: www.AAAA.BBBB\r\n'
'\r\n'
'ABCD')
rsp_contents = ('HTTP/1.1 200 OK\r\n\r\n')
proxy.write_as_browser(req_contents)
assert proxy.read_as_server() == req_contents
proxy.write_as_server(rsp_contents[:20])
assert proxy.read_as_browser() == rsp_contents[:20]
proxy.write_as_server(rsp_contents[20:])
assert proxy.read_as_browser() == rsp_contents[20:]
def test_proxy_server_macro_stream(mocker):
proxy = TestProxyConnection()
new_req_contents = 'GET / HTTP/1.1\r\nMangled: Very yes\r\n\r\n'
new_rsp_contents = 'HTTP/1.1 200 OKILIE DOKILIE\r\nMangled: Very yes\r\n\r\n'
new_req = Request(new_req_contents)
new_rsp = Response(new_rsp_contents)
test_macro = InterceptMacroTest(new_req=new_req, new_rsp=new_rsp)
proxy.setUp(mocker, int_macros={'test_macro': test_macro})
proxy.write_as_browser('GET /serious.php HTTP/1.1\r\n\r\n')
assert proxy.read_as_server() == new_req_contents
proxy.write_as_server('HTTP/1.1 404 ')
assert proxy.read_as_browser() == ''
proxy.write_as_server('NOT FOUND\r\n\r\n')
assert proxy.read_as_browser() == new_rsp_contents
# It doesn't stream if out of scope and macros are active, but whatever.
# def test_proxy_server_macro_stream_360_noscope(mocker):
# proxy = TestProxyConnection()
# new_req_contents = 'GET / HTTP/1.1\r\nMangled: Very yes\r\n\r\n'
# new_rsp_contents = 'HTTP/1.1 200 OKILIE DOKILIE\r\nMangled: Very yes\r\n\r\n'
# new_req = Request(new_req_contents)
# new_rsp = Response(new_rsp_contents)
# test_macro = InterceptMacroTest(new_req=new_req, new_rsp=new_rsp)
# proxy.setUp(mocker, int_macros={'test_macro': test_macro}, in_scope=False)
# proxy.write_as_browser('GET /serious.php HTTP/1.1\r\n\r\n')
# assert proxy.read_as_server() == 'GET /serious.php HTTP/1.1\r\n\r\n'
# proxy.write_as_server('HTTP/1.1 404 ')
# assert proxy.read_as_browser() == 'HTTP/1.1 404 '
# proxy.write_as_server('NOT FOUND\r\n\r\n')
# assert proxy.read_as_browser() == 'NOT FOUND\r\n\r\n'

@ -1,422 +1,370 @@
import pytest
import mock import mock
import random import pytest
import datetime import twisted.internet.endpoints
import pappyproxy
import base64
import collections
from pappyproxy import http from pappyproxy import http
from pappyproxy.proxy import ProxyClientFactory, ProxyServerFactory, UpstreamHTTPProxyClient from pappyproxy.proxy import MaybeTLSProtocol, start_maybe_tls, PassthroughProtocolFactory, make_proxied_connection
from pappyproxy.http import Request, Response from testutil import mock_deferred, func_deleted, TLSStringTransport, freeze, mock_int_macro, no_tcp, mock_config
from pappyproxy.macros import InterceptMacro from pappyproxy.util import PappyStringTransport
from testutil import mock_deferred, func_deleted, TLSStringTransport, freeze, mock_int_macro, no_tcp from twisted.internet import defer, ssl
from twisted.internet import defer
###############################
class InterceptMacroTest(InterceptMacro): ## Helper functions and classes
def __init__(self, new_req=None, new_rsp=None): def gen_debug_protocol(mocker):
InterceptMacro.__init__(self) def fake_start_tls(transport, context):
transport.protocol = mock.MagicMock()
self.new_req = None mocker.patch("pappyproxy.util.PappyStringTransport.startTLS", new=fake_start_tls)
self.new_rsp = None mocker.patch("pappyproxy.proxy.generate_tls_context")
if new_req:
self.intercept_requests = True t = PappyStringTransport()
self.new_req = new_req def server_data_received(s):
if new_rsp: t.write(s)
self.intercept_responses = True factory = PassthroughProtocolFactory(server_data_received, None, None)
self.new_rsp = new_rsp p = factory.buildProtocol(None)
p.transport = t
def mangle_request(self, request): t.protocol= p
if self.intercept_requests: return p, t
return self.new_req
else: def mock_protocol_proxy(mocker):
return request from pappyproxy import proxy
mock_make_proxied_connection = mocker.patch("pappyproxy.proxy.make_proxied_connection")
def mangle_response(self, request): p = proxy.ProtocolProxy()
if self.intercept_responses: p.client_transport = PappyStringTransport()
return self.new_rsp p.server_transport = PappyStringTransport()
else:
return request.response client_protocol, _ = gen_debug_protocol(mocker)
server_protocol, _ = gen_debug_protocol(mocker)
class TestProxyConnection(object):
return p, client_protocol, server_protocol, mock_make_proxied_connection
@property
def client_protocol(self): ##########################
if 'protocol' not in self.conn_info: ## Tests for ProtocolProxy
raise Exception('Connection to server not made. Cannot write data as server.')
return self.conn_info['protocol'] def test_simple(mocker):
mock_config(mocker)
@property proxy, _, _, _ = mock_protocol_proxy(mocker)
def client_factory(self): proxy.send_client_data("foobar")
if 'protocol' not in self.conn_info: assert proxy.client_buffer == "foobar"
raise Exception('Connection to server not made. Cannot write data as server.') proxy.send_server_data("barfoo")
return self.conn_info['factory'] assert proxy.server_buffer == "barfoo"
def test_connect(mocker):
mock_config(mocker)
proxy, _, _, mock_make_proxied_connection = mock_protocol_proxy(mocker)
proxy.send_client_data("foobar")
proxy.send_server_data("barfoo")
proxy.connect("fakehost", 1337, False)
assert len(mock_make_proxied_connection.mock_calls) == 1
callargs = mock_make_proxied_connection.mock_calls[0][1]
assert callargs[1] == "fakehost"
assert callargs[2] == 1337
assert callargs[3] == False
def setUp(self, mocker, int_macros={}, socks_config=None, http_config=None, in_scope=True): def test_send_before_connect(mocker):
self.mocker = mocker mock_config(mocker)
self.conn_info = {} proxy, client_protocol, server_protocol, mock_make_proxied_connection = mock_protocol_proxy(mocker)
proxy.send_client_data("foobar")
# Mock config proxy.send_server_data("barfoo")
self.mock_config = pappyproxy.config.PappyConfig()
self.mock_config.socks_proxy = socks_config proxy.connect("fakehost", 1337, False)
self.mock_config.http_proxy = http_config
self.mock_session = pappyproxy.pappy.PappySession(self.mock_config)
mocker.patch.object(pappyproxy.pappy, 'session', new=self.mock_session)
mocker.patch("pappyproxy.proxy.load_certs_from_dir", new=mock_generate_cert)
# Listening server
self.server_factory = ProxyServerFactory()
self.server_factory.save_all = True
self.server_factory.intercepting_macros = int_macros
self.server_protocol = self.server_factory.buildProtocol(('127.0.0.1', 0))
self.server_transport = TLSStringTransport()
self.server_protocol.makeConnection(self.server_transport)
# Other mocks
self.req_save = mocker.patch.object(pappyproxy.http.Request, 'async_deep_save', autospec=True, side_effect=mock_req_async_save)
self.submit_request = mocker.patch('pappyproxy.http.Request.submit_request',
new=self.gen_mock_submit_request())
self.get_endpoint = mocker.patch('pappyproxy.proxy.get_endpoint')
self.in_scope = mocker.patch('pappyproxy.context.in_scope').return_value = in_scope
def gen_mock_submit_request(self):
orig = Request.submit_request
def f(request, save_request=False, intercepting_macros={}, stream_transport=None):
return orig(request, save_request=save_request,
intercepting_macros=intercepting_macros,
stream_transport=stream_transport,
_factory_string_transport=True,
_conn_info=self.conn_info)
return f
def perform_connect_request(self):
self.write_as_browser('CONNECT https://www.AAAA.BBBB:443 HTTP/1.1\r\n\r\n')
assert self.read_as_browser() == 'HTTP/1.1 200 Connection established\r\n\r\n'
def write_as_browser(self, data): proxy.client_connection_made(client_protocol)
self.server_protocol.dataReceived(data) assert proxy.client_buffer == ''
assert client_protocol.transport.pop_value() == 'foobar'
def read_as_browser(self): assert proxy.server_buffer == 'barfoo'
s = self.server_protocol.transport.value()
self.server_protocol.transport.clear()
return s
def write_as_server(self, data): proxy.server_connection_made(server_protocol)
self.client_protocol.dataReceived(data) assert proxy.server_buffer == ''
assert server_protocol.transport.pop_value() == 'barfoo'
def read_as_server(self): def test_send_after_connect(mocker):
s = self.client_protocol.transport.value() mock_config(mocker)
self.client_protocol.transport.clear() proxy, client_protocol, server_protocol, mock_make_proxied_connection = mock_protocol_proxy(mocker)
return s
proxy.connect("fakehost", 1337, False)
proxy.client_connection_made(client_protocol)
proxy.send_client_data("foobar")
assert client_protocol.transport.pop_value() == 'foobar'
proxy.server_connection_made(server_protocol)
proxy.send_server_data("barfoo")
assert server_protocol.transport.pop_value() == 'barfoo'
def mock_req_async_save(req): def test_start_tls_before_connect(mocker):
req.reqid = str(random.randint(1,1000000)) mock_config(mocker)
return mock_deferred() proxy, client_protocol, server_protocol, mock_make_proxied_connection = mock_protocol_proxy(mocker)
client_protocol.transport.startTLS = mock.MagicMock()
def mock_mangle_response_side_effect(new_rsp): server_protocol.transport.startTLS = mock.MagicMock()
def f(request, mangle_macros):
request.response = new_rsp server_protocol.transport.startTLS.assert_not_called()
return mock_deferred(True) client_protocol.transport.startTLS.assert_not_called()
return f proxy.start_server_tls()
proxy.start_client_tls("fakehost")
def mock_generate_cert(cert_dir):
private_key = ('-----BEGIN PRIVATE KEY-----\n' server_protocol.transport.startTLS.assert_not_called()
'MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDAoClrYUEB7lM0\n' client_protocol.transport.startTLS.assert_not_called()
'zQaKkXZVG2d1Bu9hV8urpx0gNXMbyZ2m3xb+sKZju/FHPuWenA4KaN5gRUT+oLfv\n' proxy.connect("fakehost", 1337, False)
'tnF6Ia0jpRNWnX0Fyn/irdg1BWGJn7k7mJ2D0NXZQczn2+xxY05599NfGWqNKCYy\n' server_protocol.transport.startTLS.assert_not_called()
'jhSwPsUK+sGJqi7aSDdlS97ZTjrQVTTFsC0+kSu4lS5fsWXxqrKLa6Ao8W7abVRO\n' client_protocol.transport.startTLS.assert_not_called()
'JHazh/cxM4UKpgWU+E6yD4o4ZgHY+SMTVUh/IOM8DuOVyLEWtx4oLNiLMlpWT3qy\n'
'4IMpOF6VuU6JF2HGV13SoJfhsLXsPRbLVTAnZvJcZwtgDm6NfKapU8W8olkDV1Bf\n' proxy.server_connection_made(server_protocol)
'YQEMSNX7AgMBAAECggEBAII0wUrAdrzjaIMsg9tu8FofKBPHGFDok9f4Iov/FUwX\n' assert len(server_protocol.transport.startTLS.mock_calls) == 1
'QOXnrxeCOTb5d+L89SH9ws/ui0LwD+8+nJcA8DvqP6r0jtnhov0jIMcNVDSi6oeo\n' client_protocol.transport.startTLS.assert_not_called()
'3AEY7ICJzcQJ4oRn+K+8vPNdPhfuikPYe9l4iSuJgpAlaGWyD/GlFyz12DFz2/Wu\n'
'NIcqR1ucvezRHn3eGMtvDv2WGaN4ifUc30k8XgSUesmwSI6beb5+hxq7wXfsurnP\n' proxy.client_connection_made(client_protocol)
'EUrPY9ts3lfiAgxzTKOuj1VR5hn7cJyLN8jF0mZs4D6eSSHorIddhmaNiCq5ZbMd\n' assert len(client_protocol.transport.startTLS.mock_calls) == 1
'QdlDiPvnXHT41OoXOb7tDEt7SGoiRh2noCZ1aZiSziECgYEA+tuPPLYWU6JRB6EW\n'
'PhbcXQbh3vML7eT1q7DOz0jYCojgT2+k7EWSI8T830oQyjbpe3Z86XEgH7UBjUgq\n' def test_start_tls_after_connect(mocker):
'27nJ4E6dQDYGbYCKEklOoCGLE7A60i1feIz8otOQRrbQ4jcpibEgscA6gzHmunYf\n' mock_config(mocker)
'De5euUgYW+Rq2Vmr6/NzUaUgui8CgYEAxJMDwPOGgiLM1cczlaSIU9Obz+cVnwWn\n' proxy, client_protocol, server_protocol, mock_make_proxied_connection = mock_protocol_proxy(mocker)
'nsdKYMto2V3yKLydDfjsgOgzxHOxxy+5L645TPxK6CkiISuhJ93kAFFtx+1sCBCT\n' client_protocol.transport.startTLS = mock.MagicMock()
'tVzY5robVAekxA9tlPIxtsn3+/axx3n6HnV0oA/XtxkuOS5JImgEdXqFwJZkerGE\n' server_protocol.transport.startTLS = mock.MagicMock()
'waftIU2FCfUCgYEArl8+ErJzlJEIiCgWIPSdGuD00pfZW/TCPCT7rKRy3+fDHBR7\n'
'7Gxzp/9+0utV/mnrJBH5w/8JmGCmgoF+oRtk01FyBzdGgolN8GYajD6kwPvH917o\n' server_protocol.transport.startTLS.assert_not_called()
'tRAzcC9lY3IigoxbiEWid0wqoBVoz4XaEkH2gA44OG/vQcQOOEYSi9cfh6sCgYBg\n' client_protocol.transport.startTLS.assert_not_called()
'KLaOXdJvuIxRCzgNvMW/k+VFh3pJJx//COg2f2qT4mQCT3nYiutOh8hDEoFluc+y\n'
'Jlz7bvNJrE14wnn8IYxWJ383bMoLC+jlsDyeaW3S5kZQbmehk/SDwTrg86W1udKD\n' proxy.connect("fakehost", 1337, False)
'sdtSLU3N0LCO4jh+bzm3Ki9hrXALoOkbPoU+ZEhvPQKBgQDf79XQ3RNxZSk+eFyq\n'
'qD8ytVqxEoD+smPDflXXseVH6o+pNWrF8+A0KqmO8c+8KVzWj/OfULO6UbKd3E+x\n' server_protocol.transport.startTLS.assert_not_called()
'4JGkWu9yF1lEgtHgibF2ER8zCSIL4ikOEasPCkrKj5SrS4Q+j4u5ha76dIc2CVu1\n' client_protocol.transport.startTLS.assert_not_called()
'hkX2PQ1xU4ocu06k373sf73A4Q==\n'
'-----END PRIVATE KEY-----') proxy.server_connection_made(server_protocol)
ca_key = ('-----BEGIN CERTIFICATE-----\n' proxy.start_server_tls()
'MIIDjzCCAncCFQCjC8r+I4xa7JoGUJYGOTcqDROA0DANBgkqhkiG9w0BAQsFADBg\n' assert len(server_protocol.transport.startTLS.mock_calls) == 1
'MQswCQYDVQQGEwJVUzERMA8GA1UECBMITWljaGlnYW4xEjAQBgNVBAcTCUFubiBB\n'
'cmJvcjEUMBIGA1UEChMLUGFwcHkgUHJveHkxFDASBgNVBAMTC1BhcHB5IFByb3h5\n' proxy.client_connection_made(client_protocol)
'MB4XDTE1MTEyMDIxMTEzOVoXDTI1MTExNzIxMTEzOVowYDELMAkGA1UEBhMCVVMx\n' proxy.start_client_tls("fakehost")
'ETAPBgNVBAgTCE1pY2hpZ2FuMRIwEAYDVQQHEwlBbm4gQXJib3IxFDASBgNVBAoT\n' assert len(client_protocol.transport.startTLS.mock_calls) == 1
'C1BhcHB5IFByb3h5MRQwEgYDVQQDEwtQYXBweSBQcm94eTCCASIwDQYJKoZIhvcN\n'
'AQEBBQADggEPADCCAQoCggEBAMCgKWthQQHuUzTNBoqRdlUbZ3UG72FXy6unHSA1\n' def test_start_maybe_tls_before_connect(mocker):
'cxvJnabfFv6wpmO78Uc+5Z6cDgpo3mBFRP6gt++2cXohrSOlE1adfQXKf+Kt2DUF\n' mock_config(mocker)
'YYmfuTuYnYPQ1dlBzOfb7HFjTnn3018Zao0oJjKOFLA+xQr6wYmqLtpIN2VL3tlO\n' proxy, client_protocol, server_protocol, mock_make_proxied_connection = mock_protocol_proxy(mocker)
'OtBVNMWwLT6RK7iVLl+xZfGqsotroCjxbtptVE4kdrOH9zEzhQqmBZT4TrIPijhm\n' mock_maybe_tls = mocker.patch("pappyproxy.proxy.start_maybe_tls")
'Adj5IxNVSH8g4zwO45XIsRa3Higs2IsyWlZPerLggyk4XpW5TokXYcZXXdKgl+Gw\n' client_protocol.transport.startTLS = mock.MagicMock()
'tew9FstVMCdm8lxnC2AObo18pqlTxbyiWQNXUF9hAQxI1fsCAwEAAaNFMEMwEgYD\n' server_protocol.transport.startTLS = mock.MagicMock()
'VR0TAQH/BAgwBgEB/wIBADAOBgNVHQ8BAf8EBAMCAQYwHQYDVR0OBBYEFNo5o+5e\n'
'a0sNMlW/75VgGJCv2AcJMA0GCSqGSIb3DQEBCwUAA4IBAQBdJDhxbmoEe27bD8me\n' proxy.start_client_maybe_tls("fakehost")
'YTcLGjs/StKkSil7rLbX+tBCwtkm5UEEejBuAhKk2FuAXW8yR1FqKJSZwVCAocBT\n'
'Bo/+97Ee+h7ywrRFhATEr9D/TbbHKOjCjDzOMl9yLZa2DKErZjbI30ZD6NafWS/X\n' client_protocol.transport.startTLS.assert_not_called()
'hx5X1cGohHcVVzT4jIgUEU70vvYfNn8CTZm4oJ7qqRe/uQPUYy0rwvbd60oprtGg\n' proxy.connect("fakehost", 1337, False)
'jNv1H5R4ODHUMBXAI9H7ft9cWrd0fBQjxhoj8pvgJXEZ52flXSqQc7qHLg1wO/zC\n' client_protocol.transport.startTLS.assert_not_called()
'RUgpTcNAb2qCssBKbj+c1vKEPRUJfw6UYb0s1462rQNc8BgZiKaNbwokFmkAnjUg\n'
'AvnX\n' proxy.client_connection_made(client_protocol)
'-----END CERTIFICATE-----') client_protocol.transport.startTLS.assert_not_called()
return (ca_key, private_key) assert len(mock_maybe_tls.mock_calls) == 1
######## callargs = mock_maybe_tls.mock_calls[0][2]
## Tests assert callargs['tls_host'] == 'fakehost'
assert callargs['start_tls_callback'] == proxy.start_server_tls
def test_no_tcp():
from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint def test_start_maybe_tls_before_connect(mocker):
from txsocksx.client import SOCKS5ClientEndpoint mock_config(mocker)
from txsocksx.tls import TLSWrapClientEndpoint proxy, client_protocol, server_protocol, mock_make_proxied_connection = mock_protocol_proxy(mocker)
with pytest.raises(NotImplementedError): mock_maybe_tls = mocker.patch("pappyproxy.proxy.start_maybe_tls")
SSL4ClientEndpoint('aasdfasdf.sdfwerqwer') client_protocol.transport.startTLS = mock.MagicMock()
with pytest.raises(NotImplementedError): server_protocol.transport.startTLS = mock.MagicMock()
TCP4ClientEndpoint('aasdfasdf.sdfwerqwer')
with pytest.raises(NotImplementedError):
SOCKS5ClientEndpoint('aasdfasdf.sdfwerqwer') client_protocol.transport.startTLS.assert_not_called()
with pytest.raises(NotImplementedError): proxy.connect("fakehost", 1337, False)
TLSWrapClientEndpoint('asdf.2341') client_protocol.transport.startTLS.assert_not_called()
def test_proxy_server_connect(mocker): proxy.client_connection_made(client_protocol)
proxy = TestProxyConnection() proxy.start_client_maybe_tls("fakehost")
proxy.setUp(mocker) client_protocol.transport.startTLS.assert_not_called()
proxy.write_as_browser('CONNECT https://www.AAAA.BBBB:443 HTTP/1.1\r\n\r\n') assert len(mock_maybe_tls.mock_calls) == 1
rsp = proxy.read_as_browser()
print rsp callargs = mock_maybe_tls.mock_calls[0][2]
assert rsp == 'HTTP/1.1 200 Connection established\r\n\r\n' assert callargs['tls_host'] == 'fakehost'
assert callargs['start_tls_callback'] == proxy.start_server_tls
def test_proxy_server_forward_basic(mocker):
proxy = TestProxyConnection()
proxy.setUp(mocker)
req_contents = ('POST /fooo HTTP/1.1\r\n'
'Test-Header: foo\r\n'
'Content-Length: 4\r\n'
'Host: www.AAAA.BBBB\r\n'
'\r\n'
'ABCD')
rsp_contents = ('HTTP/1.1 200 OK\r\n\r\n')
proxy.write_as_browser(req_contents)
assert proxy.read_as_server() == req_contents
proxy.write_as_server(rsp_contents)
assert proxy.read_as_browser() == rsp_contents
proxy.get_endpoint.assert_called_with('www.AAAA.BBBB', 80, False, socks_config=None, use_http_proxy=True)
assert proxy.req_save.called
def test_proxy_server_forward_basic_ssl(mocker):
proxy = TestProxyConnection()
proxy.setUp(mocker)
proxy.perform_connect_request()
req_contents = ('POST /fooo HTTP/1.1\r\n'
'Test-Header: foo\r\n'
'Content-Length: 4\r\n'
'\r\n'
'ABCD')
rsp_contents = ('HTTP/1.1 200 OK\r\n\r\n')
proxy.write_as_browser(req_contents)
assert proxy.read_as_server() == req_contents
proxy.write_as_server(rsp_contents)
assert proxy.read_as_browser() == rsp_contents
assert proxy.req_save.called
proxy.get_endpoint.assert_called_with('www.AAAA.BBBB', 443, True, socks_config=None, use_http_proxy=True)
def test_proxy_server_connect_uri(mocker):
proxy = TestProxyConnection()
proxy.setUp(mocker)
proxy.write_as_browser('CONNECT https://www.AAAA.BBBB:443 HTTP/1.1\r\n\r\n')
proxy.read_as_browser()
req_contents = ('POST /fooo HTTP/1.1\r\n'
'Test-Header: foo\r\n'
'Content-Length: 4\r\n'
'\r\n'
'ABCD')
proxy.write_as_browser(req_contents)
assert proxy.client_protocol.transport.startTLS.called
assert proxy.client_factory.request.host == 'www.AAAA.BBBB'
assert proxy.client_factory.request.port == 443
assert proxy.client_factory.request.is_ssl == True
assert proxy.read_as_server() == req_contents
assert proxy.client_protocol.transport.startTLS.called
assert proxy.req_save.called
proxy.get_endpoint.assert_called_with('www.AAAA.BBBB', 443, True, socks_config=None, use_http_proxy=True)
def test_proxy_server_connect_uri_alt_port(mocker):
proxy = TestProxyConnection()
proxy.setUp(mocker)
proxy.write_as_browser('CONNECT https://www.AAAA.BBBB:80085 HTTP/1.1\r\n\r\n')
proxy.read_as_browser()
req_contents = ('POST /fooo HTTP/1.1\r\n'
'Test-Header: foo\r\n'
'Content-Length: 4\r\n'
'\r\n'
'ABCD')
proxy.write_as_browser(req_contents)
assert proxy.client_factory.request.host == 'www.AAAA.BBBB'
assert proxy.client_factory.request.port == 80085
assert proxy.client_factory.request.is_ssl == True
assert proxy.req_save.called
proxy.get_endpoint.assert_called_with('www.AAAA.BBBB', 80085, True, socks_config=None, use_http_proxy=True)
def test_proxy_server_socks_basic(mocker):
proxy = TestProxyConnection()
proxy.setUp(mocker, socks_config={'host': 'www.banana.faketld', 'port': 1337})
proxy.write_as_browser('CONNECT https://www.AAAA.BBBB:80085 HTTP/1.1\r\n\r\n')
proxy.read_as_browser()
req_contents = ('POST /fooo HTTP/1.1\r\n'
'Test-Header: foo\r\n'
'Content-Length: 4\r\n'
'\r\n'
'ABCD')
proxy.write_as_browser(req_contents)
proxy.get_endpoint.assert_called_with('www.AAAA.BBBB', 80085, True,
socks_config={'host':'www.banana.faketld', 'port':1337},
use_http_proxy=True)
def test_proxy_server_http_basic(mocker):
proxy = TestProxyConnection()
proxy.setUp(mocker, http_config={'host': 'www.banana.faketld', 'port': 1337})
proxy.write_as_browser('CONNECT https://www.AAAA.BBBB:80085 HTTP/1.1\r\n\r\n')
proxy.read_as_browser()
req_contents = ('POST /fooo HTTP/1.1\r\n'
'Test-Header: foo\r\n'
'Content-Length: 4\r\n'
'\r\n'
'ABCD')
proxy.write_as_browser(req_contents)
assert proxy.req_save.called
proxy.get_endpoint.assert_called_with('www.AAAA.BBBB', 80085, True,
socks_config=None,
use_http_proxy=True)
def test_proxy_server_360_noscope(mocker):
proxy = TestProxyConnection()
proxy.setUp(mocker, in_scope=False, socks_config={'host': 'www.banana.faketld', 'port': 1337})
proxy.write_as_browser('CONNECT https://www.AAAA.BBBB:80085 HTTP/1.1\r\n\r\n')
proxy.read_as_browser()
req_contents = ('POST /fooo HTTP/1.1\r\n'
'Test-Header: foo\r\n'
'Content-Length: 4\r\n'
'\r\n'
'ABCD')
proxy.write_as_browser(req_contents)
assert not proxy.req_save.called
proxy.get_endpoint.assert_called_with('www.AAAA.BBBB', 80085, True,
socks_config=None,
use_http_proxy=False)
def test_proxy_server_macro_simple(mocker):
proxy = TestProxyConnection() #############################
## Tests for MaybeTLSProtocol
new_req_contents = 'GET / HTTP/1.1\r\nMangled: Very yes\r\n\r\n' def test_maybe_tls_plaintext(mocker):
new_rsp_contents = 'HTTP/1.1 200 OKILIE DOKILIE\r\nMangled: Very yes\r\n\r\n' mock_config(mocker)
new_req = Request(new_req_contents) tls_callback = mock.MagicMock()
new_rsp = Response(new_rsp_contents) p, t = gen_debug_protocol(mocker)
test_macro = InterceptMacroTest(new_req=new_req, new_rsp=new_rsp) start_maybe_tls(p.transport, 'www.foo.faketld')
proxy.setUp(mocker, int_macros={'test_macro': test_macro}) p.dataReceived("Hello world!")
proxy.write_as_browser('GET /serious.php HTTP/1.1\r\n\r\n') assert p.transport.pop_value() == "Hello world!"
assert proxy.read_as_server() == new_req_contents
proxy.write_as_server('HTTP/1.1 404 NOT FOUND\r\n\r\n') def test_maybe_tls_use_tls(mocker):
assert proxy.read_as_browser() == new_rsp_contents mock_config(mocker)
tls_callback = mock.MagicMock()
def test_proxy_server_macro_multiple(mocker): p, t = gen_debug_protocol(mocker)
proxy = TestProxyConnection() start_maybe_tls(p.transport, 'www.foo.faketld')
maybe_tls_prot = t.protocol
new_req_contents1 = 'GET / HTTP/1.1\r\nMangled: Very yes\r\n\r\n' assert isinstance(maybe_tls_prot, MaybeTLSProtocol)
new_rsp_contents1 = 'HTTP/1.1 200 OKILIE DOKILIE\r\nMangled: Very yes\r\n\r\n' assert maybe_tls_prot.state == MaybeTLSProtocol.STATE_DECIDING
new_req1 = Request(new_req_contents1) t.protocol.dataReceived("\x16")
new_rsp1 = Response(new_rsp_contents1) assert not isinstance(t.protocol, MaybeTLSProtocol)
assert maybe_tls_prot.state == MaybeTLSProtocol.STATE_PASSTHROUGH
new_req_contents2 = 'GET / HTTP/1.1\r\nMangled: Very very yes\r\n\r\n'
new_rsp_contents2 = 'HTTP/1.1 200 OKILIE DOKILIE\r\nMangled: Very very yes\r\n\r\n' ####################################
new_req2 = Request(new_req_contents2) ## Tests for make_proxied_connection
new_rsp2 = Response(new_rsp_contents2) def test_mpc_simple(mocker):
mock_config(mocker)
test_macro1 = InterceptMacroTest(new_req=new_req1, new_rsp=new_rsp1) endpoint_instance = mock.MagicMock()
test_macro2 = InterceptMacroTest(new_req=new_req2, new_rsp=new_rsp2) endpoint = mocker.patch('twisted.internet.endpoints.TCP4ClientEndpoint', return_value=endpoint_instance)
macros = collections.OrderedDict() make_proxied_connection('fakefactory', 'fakehost', 1337, False)
macros['macro1'] = test_macro1
macros['macro2'] = test_macro2 endpointcalls = endpoint.mock_calls[0]
assert endpointcalls[1][1] == 'fakehost'
proxy.setUp(mocker, int_macros=macros) assert endpointcalls[1][2] == 1337
proxy.write_as_browser('GET /serious.php HTTP/1.1\r\n\r\n')
assert proxy.read_as_server() == new_req_contents2 connectcall = endpoint_instance.connect
proxy.write_as_server('HTTP/1.1 404 NOT FOUND\r\n\r\n') assert len(connectcall.mock_calls) == 1
assert proxy.read_as_browser() == new_rsp_contents2
def test_mpc_ssl(mocker):
def test_proxy_server_macro_360_noscope(mocker): mock_config(mocker)
proxy = TestProxyConnection() endpoint_instance = mock.MagicMock()
endpoint = mocker.patch('twisted.internet.endpoints.SSL4ClientEndpoint', return_value=endpoint_instance)
new_req_contents = 'GET / HTTP/1.1\r\nMangled: Very yes\r\n\r\n'
new_rsp_contents = 'HTTP/1.1 200 OKILIE DOKILIE\r\nMangled: Very yes\r\n\r\n' make_proxied_connection('fakefactory', 'fakehost', 1337, True)
new_req = Request(new_req_contents)
new_rsp = Response(new_rsp_contents) endpointcalls = endpoint.mock_calls[0]
test_macro = InterceptMacroTest(new_req=new_req, new_rsp=new_rsp) assert endpointcalls[1][1] == 'fakehost'
proxy.setUp(mocker, int_macros={'test_macro': test_macro}, in_scope=False) assert endpointcalls[1][2] == 1337
proxy.write_as_browser('GET /serious.php HTTP/1.1\r\n\r\n') assert isinstance(endpointcalls[1][3], ssl.ClientContextFactory)
assert proxy.read_as_server() == 'GET /serious.php HTTP/1.1\r\n\r\n'
proxy.write_as_server('HTTP/1.1 404 NOT FOUND\r\n\r\n') connectcall = endpoint_instance.connect
assert proxy.read_as_browser() == 'HTTP/1.1 404 NOT FOUND\r\n\r\n' assert len(connectcall.mock_calls) == 1
def test_proxy_server_stream_simple(mocker): def test_mpc_socks(mocker):
proxy = TestProxyConnection() mock_config(mocker)
proxy.setUp(mocker) tcp_endpoint_instance = mock.MagicMock()
req_contents = ('POST /fooo HTTP/1.1\r\n' socks_endpoint_instance = mock.MagicMock()
'Test-Header: foo\r\n' tcp_endpoint = mocker.patch('twisted.internet.endpoints.TCP4ClientEndpoint', return_value=tcp_endpoint_instance)
'Content-Length: 4\r\n' socks_endpoint = mocker.patch('txsocksx.client.SOCKS5ClientEndpoint', return_value=socks_endpoint_instance)
'Host: www.AAAA.BBBB\r\n'
'\r\n' target_host = 'fakehost'
'ABCD') target_port = 1337
rsp_contents = ('HTTP/1.1 200 OK\r\n\r\n')
proxy.write_as_browser(req_contents) socks_host = 'fakesockshost'
assert proxy.read_as_server() == req_contents socks_port = 1447
proxy.write_as_server(rsp_contents[:20]) socks_config = {'host':socks_host, 'port':socks_port}
assert proxy.read_as_browser() == rsp_contents[:20]
proxy.write_as_server(rsp_contents[20:]) make_proxied_connection('fakefactory', target_host, target_port, False, socks_config=socks_config)
assert proxy.read_as_browser() == rsp_contents[20:]
tcp_ep_calls = tcp_endpoint.mock_calls[0]
def test_proxy_server_macro_stream(mocker): assert tcp_ep_calls[1][1] == socks_host
proxy = TestProxyConnection() assert tcp_ep_calls[1][2] == socks_port
new_req_contents = 'GET / HTTP/1.1\r\nMangled: Very yes\r\n\r\n' socks_ep_calls = socks_endpoint.mock_calls[0]
new_rsp_contents = 'HTTP/1.1 200 OKILIE DOKILIE\r\nMangled: Very yes\r\n\r\n' assert socks_ep_calls[1][0] == target_host
new_req = Request(new_req_contents) assert socks_ep_calls[1][1] == target_port
new_rsp = Response(new_rsp_contents) assert socks_ep_calls[1][2] is tcp_endpoint_instance
test_macro = InterceptMacroTest(new_req=new_req, new_rsp=new_rsp) assert socks_ep_calls[2]['methods'] == {'anonymous': ()}
proxy.setUp(mocker, int_macros={'test_macro': test_macro})
proxy.write_as_browser('GET /serious.php HTTP/1.1\r\n\r\n') connectcall = socks_endpoint_instance.connect
assert proxy.read_as_server() == new_req_contents assert len(connectcall.mock_calls) == 1
proxy.write_as_server('HTTP/1.1 404 ')
assert proxy.read_as_browser() == '' def test_mpc_socks_creds(mocker):
proxy.write_as_server('NOT FOUND\r\n\r\n') mock_config(mocker)
assert proxy.read_as_browser() == new_rsp_contents tcp_endpoint_instance = mock.MagicMock()
socks_endpoint_instance = mock.MagicMock()
# It doesn't stream if out of scope and macros are active, but whatever. tcp_endpoint = mocker.patch('twisted.internet.endpoints.TCP4ClientEndpoint', return_value=tcp_endpoint_instance)
# def test_proxy_server_macro_stream_360_noscope(mocker): socks_endpoint = mocker.patch('txsocksx.client.SOCKS5ClientEndpoint', return_value=socks_endpoint_instance)
# proxy = TestProxyConnection()
target_host = 'fakehost'
# new_req_contents = 'GET / HTTP/1.1\r\nMangled: Very yes\r\n\r\n' target_port = 1337
# new_rsp_contents = 'HTTP/1.1 200 OKILIE DOKILIE\r\nMangled: Very yes\r\n\r\n'
# new_req = Request(new_req_contents) socks_host = 'fakesockshost'
# new_rsp = Response(new_rsp_contents) socks_port = 1447
# test_macro = InterceptMacroTest(new_req=new_req, new_rsp=new_rsp) socks_user = 'username'
# proxy.setUp(mocker, int_macros={'test_macro': test_macro}, in_scope=False) socks_password = 'password'
# proxy.write_as_browser('GET /serious.php HTTP/1.1\r\n\r\n') socks_config = {'host':socks_host, 'port':socks_port,
# assert proxy.read_as_server() == 'GET /serious.php HTTP/1.1\r\n\r\n' 'username':socks_user, 'password':socks_password}
# proxy.write_as_server('HTTP/1.1 404 ')
# assert proxy.read_as_browser() == 'HTTP/1.1 404 ' make_proxied_connection('fakefactory', target_host, target_port, False, socks_config=socks_config)
# proxy.write_as_server('NOT FOUND\r\n\r\n')
# assert proxy.read_as_browser() == 'NOT FOUND\r\n\r\n' tcp_ep_calls = tcp_endpoint.mock_calls[0]
assert tcp_ep_calls[1][1] == socks_host
assert tcp_ep_calls[1][2] == socks_port
socks_ep_calls = socks_endpoint.mock_calls[0]
assert socks_ep_calls[1][0] == target_host
assert socks_ep_calls[1][1] == target_port
assert socks_ep_calls[1][2] is tcp_endpoint_instance
assert socks_ep_calls[2]['methods'] == {'login': (socks_user, socks_password), 'anonymous': ()}
connectcall = socks_endpoint_instance.connect
assert len(connectcall.mock_calls) == 1
def test_mpc_socks_ssl(mocker):
mock_config(mocker)
tcp_endpoint_instance = mock.MagicMock()
socks_endpoint_instance = mock.MagicMock()
wrapper_instance = mock.MagicMock()
tcp_endpoint = mocker.patch('twisted.internet.endpoints.TCP4ClientEndpoint', return_value=tcp_endpoint_instance)
socks_endpoint = mocker.patch('txsocksx.client.SOCKS5ClientEndpoint', return_value=socks_endpoint_instance)
wrapper_endpoint = mocker.patch('txsocksx.tls.TLSWrapClientEndpoint', return_value=wrapper_instance)
target_host = 'fakehost'
target_port = 1337
socks_host = 'fakesockshost'
socks_port = 1447
socks_config = {'host':socks_host, 'port':socks_port}
make_proxied_connection('fakefactory', target_host, target_port, True, socks_config=socks_config)
tcp_ep_calls = tcp_endpoint.mock_calls[0]
assert tcp_ep_calls[1][1] == socks_host
assert tcp_ep_calls[1][2] == socks_port
socks_ep_calls = socks_endpoint.mock_calls[0]
assert socks_ep_calls[1][0] == target_host
assert socks_ep_calls[1][1] == target_port
assert socks_ep_calls[1][2] is tcp_endpoint_instance
assert socks_ep_calls[2]['methods'] == {'anonymous': ()}
wrapper_calls = wrapper_endpoint.mock_calls[0]
assert isinstance(wrapper_calls[1][0], ssl.ClientContextFactory)
assert wrapper_calls[1][1] is socks_endpoint_instance
connectcall = wrapper_instance.connect
assert len(connectcall.mock_calls) == 1
def test_mpc_socks_ssl_creds(mocker):
mock_config(mocker)
tcp_endpoint_instance = mock.MagicMock()
socks_endpoint_instance = mock.MagicMock()
wrapper_instance = mock.MagicMock()
tcp_endpoint = mocker.patch('twisted.internet.endpoints.TCP4ClientEndpoint', return_value=tcp_endpoint_instance)
socks_endpoint = mocker.patch('txsocksx.client.SOCKS5ClientEndpoint', return_value=socks_endpoint_instance)
wrapper_endpoint = mocker.patch('txsocksx.tls.TLSWrapClientEndpoint', return_value=wrapper_instance)
target_host = 'fakehost'
target_port = 1337
socks_host = 'fakesockshost'
socks_port = 1447
socks_user = 'username'
socks_password = 'password'
socks_config = {'host':socks_host, 'port':socks_port,
'username':socks_user, 'password':socks_password}
make_proxied_connection('fakefactory', target_host, target_port, True, socks_config=socks_config)
tcp_ep_calls = tcp_endpoint.mock_calls[0]
assert tcp_ep_calls[1][1] == socks_host
assert tcp_ep_calls[1][2] == socks_port
socks_ep_calls = socks_endpoint.mock_calls[0]
assert socks_ep_calls[1][0] == target_host
assert socks_ep_calls[1][1] == target_port
assert socks_ep_calls[1][2] is tcp_endpoint_instance
assert socks_ep_calls[2]['methods'] == {'login': (socks_user, socks_password), 'anonymous': ()}
wrapper_calls = wrapper_endpoint.mock_calls[0]
assert isinstance(wrapper_calls[1][0], ssl.ClientContextFactory)
assert wrapper_calls[1][1] is socks_endpoint_instance
connectcall = wrapper_instance.connect
assert len(connectcall.mock_calls) == 1

@ -4,7 +4,7 @@ import pytest
import StringIO import StringIO
from twisted.internet import defer from twisted.internet import defer
from twisted.test.proto_helpers import StringTransport from twisted.test.proto_helpers import StringTransport
from pappyproxy import http from pappyproxy import http, config, pappy
next_mock_id = 0 next_mock_id = 0
@ -56,12 +56,13 @@ def mock_deferred(value=None):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def no_tcp(mocker): def no_tcp(mocker):
# Don't make tcp connections # Don't make tcp connections
mocker.patch("twisted.internet.reactor.connectTCP", new=func_deleted) #mocker.patch("twisted.internet.reactor.connectTCP", new=func_deleted)
mocker.patch("twisted.internet.reactor.connectSSL", new=func_deleted) #mocker.patch("twisted.internet.reactor.connectSSL", new=func_deleted)
mocker.patch("twisted.internet.endpoints.SSL4ClientEndpoint", new=func_deleted) #mocker.patch("twisted.internet.endpoints.SSL4ClientEndpoint", new=func_deleted)
mocker.patch("twisted.internet.endpoints.TCP4ClientEndpoint", new=func_deleted) #mocker.patch("twisted.internet.endpoints.TCP4ClientEndpoint", new=func_deleted)
mocker.patch("txsocksx.client.SOCKS5ClientEndpoint", new=func_deleted) #mocker.patch("txsocksx.client.SOCKS5ClientEndpoint", new=func_deleted)
mocker.patch("txsocksx.tls.TLSWrapClientEndpoint", new=func_deleted) #mocker.patch("txsocksx.tls.TLSWrapClientEndpoint", new=func_deleted)
pass
@pytest.fixture @pytest.fixture
def ignore_tcp(mocker): def ignore_tcp(mocker):
@ -170,3 +171,66 @@ def mock_int_macro(modified_req=None, modified_rsp=None,
else: else:
macro.intercept_responses = False macro.intercept_responses = False
return macro return macro
def mock_generate_cert(cert_dir):
private_key = ('-----BEGIN PRIVATE KEY-----\n'
'MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDAoClrYUEB7lM0\n'
'zQaKkXZVG2d1Bu9hV8urpx0gNXMbyZ2m3xb+sKZju/FHPuWenA4KaN5gRUT+oLfv\n'
'tnF6Ia0jpRNWnX0Fyn/irdg1BWGJn7k7mJ2D0NXZQczn2+xxY05599NfGWqNKCYy\n'
'jhSwPsUK+sGJqi7aSDdlS97ZTjrQVTTFsC0+kSu4lS5fsWXxqrKLa6Ao8W7abVRO\n'
'JHazh/cxM4UKpgWU+E6yD4o4ZgHY+SMTVUh/IOM8DuOVyLEWtx4oLNiLMlpWT3qy\n'
'4IMpOF6VuU6JF2HGV13SoJfhsLXsPRbLVTAnZvJcZwtgDm6NfKapU8W8olkDV1Bf\n'
'YQEMSNX7AgMBAAECggEBAII0wUrAdrzjaIMsg9tu8FofKBPHGFDok9f4Iov/FUwX\n'
'QOXnrxeCOTb5d+L89SH9ws/ui0LwD+8+nJcA8DvqP6r0jtnhov0jIMcNVDSi6oeo\n'
'3AEY7ICJzcQJ4oRn+K+8vPNdPhfuikPYe9l4iSuJgpAlaGWyD/GlFyz12DFz2/Wu\n'
'NIcqR1ucvezRHn3eGMtvDv2WGaN4ifUc30k8XgSUesmwSI6beb5+hxq7wXfsurnP\n'
'EUrPY9ts3lfiAgxzTKOuj1VR5hn7cJyLN8jF0mZs4D6eSSHorIddhmaNiCq5ZbMd\n'
'QdlDiPvnXHT41OoXOb7tDEt7SGoiRh2noCZ1aZiSziECgYEA+tuPPLYWU6JRB6EW\n'
'PhbcXQbh3vML7eT1q7DOz0jYCojgT2+k7EWSI8T830oQyjbpe3Z86XEgH7UBjUgq\n'
'27nJ4E6dQDYGbYCKEklOoCGLE7A60i1feIz8otOQRrbQ4jcpibEgscA6gzHmunYf\n'
'De5euUgYW+Rq2Vmr6/NzUaUgui8CgYEAxJMDwPOGgiLM1cczlaSIU9Obz+cVnwWn\n'
'nsdKYMto2V3yKLydDfjsgOgzxHOxxy+5L645TPxK6CkiISuhJ93kAFFtx+1sCBCT\n'
'tVzY5robVAekxA9tlPIxtsn3+/axx3n6HnV0oA/XtxkuOS5JImgEdXqFwJZkerGE\n'
'waftIU2FCfUCgYEArl8+ErJzlJEIiCgWIPSdGuD00pfZW/TCPCT7rKRy3+fDHBR7\n'
'7Gxzp/9+0utV/mnrJBH5w/8JmGCmgoF+oRtk01FyBzdGgolN8GYajD6kwPvH917o\n'
'tRAzcC9lY3IigoxbiEWid0wqoBVoz4XaEkH2gA44OG/vQcQOOEYSi9cfh6sCgYBg\n'
'KLaOXdJvuIxRCzgNvMW/k+VFh3pJJx//COg2f2qT4mQCT3nYiutOh8hDEoFluc+y\n'
'Jlz7bvNJrE14wnn8IYxWJ383bMoLC+jlsDyeaW3S5kZQbmehk/SDwTrg86W1udKD\n'
'sdtSLU3N0LCO4jh+bzm3Ki9hrXALoOkbPoU+ZEhvPQKBgQDf79XQ3RNxZSk+eFyq\n'
'qD8ytVqxEoD+smPDflXXseVH6o+pNWrF8+A0KqmO8c+8KVzWj/OfULO6UbKd3E+x\n'
'4JGkWu9yF1lEgtHgibF2ER8zCSIL4ikOEasPCkrKj5SrS4Q+j4u5ha76dIc2CVu1\n'
'hkX2PQ1xU4ocu06k373sf73A4Q==\n'
'-----END PRIVATE KEY-----')
ca_key = ('-----BEGIN CERTIFICATE-----\n'
'MIIDjzCCAncCFQCjC8r+I4xa7JoGUJYGOTcqDROA0DANBgkqhkiG9w0BAQsFADBg\n'
'MQswCQYDVQQGEwJVUzERMA8GA1UECBMITWljaGlnYW4xEjAQBgNVBAcTCUFubiBB\n'
'cmJvcjEUMBIGA1UEChMLUGFwcHkgUHJveHkxFDASBgNVBAMTC1BhcHB5IFByb3h5\n'
'MB4XDTE1MTEyMDIxMTEzOVoXDTI1MTExNzIxMTEzOVowYDELMAkGA1UEBhMCVVMx\n'
'ETAPBgNVBAgTCE1pY2hpZ2FuMRIwEAYDVQQHEwlBbm4gQXJib3IxFDASBgNVBAoT\n'
'C1BhcHB5IFByb3h5MRQwEgYDVQQDEwtQYXBweSBQcm94eTCCASIwDQYJKoZIhvcN\n'
'AQEBBQADggEPADCCAQoCggEBAMCgKWthQQHuUzTNBoqRdlUbZ3UG72FXy6unHSA1\n'
'cxvJnabfFv6wpmO78Uc+5Z6cDgpo3mBFRP6gt++2cXohrSOlE1adfQXKf+Kt2DUF\n'
'YYmfuTuYnYPQ1dlBzOfb7HFjTnn3018Zao0oJjKOFLA+xQr6wYmqLtpIN2VL3tlO\n'
'OtBVNMWwLT6RK7iVLl+xZfGqsotroCjxbtptVE4kdrOH9zEzhQqmBZT4TrIPijhm\n'
'Adj5IxNVSH8g4zwO45XIsRa3Higs2IsyWlZPerLggyk4XpW5TokXYcZXXdKgl+Gw\n'
'tew9FstVMCdm8lxnC2AObo18pqlTxbyiWQNXUF9hAQxI1fsCAwEAAaNFMEMwEgYD\n'
'VR0TAQH/BAgwBgEB/wIBADAOBgNVHQ8BAf8EBAMCAQYwHQYDVR0OBBYEFNo5o+5e\n'
'a0sNMlW/75VgGJCv2AcJMA0GCSqGSIb3DQEBCwUAA4IBAQBdJDhxbmoEe27bD8me\n'
'YTcLGjs/StKkSil7rLbX+tBCwtkm5UEEejBuAhKk2FuAXW8yR1FqKJSZwVCAocBT\n'
'Bo/+97Ee+h7ywrRFhATEr9D/TbbHKOjCjDzOMl9yLZa2DKErZjbI30ZD6NafWS/X\n'
'hx5X1cGohHcVVzT4jIgUEU70vvYfNn8CTZm4oJ7qqRe/uQPUYy0rwvbd60oprtGg\n'
'jNv1H5R4ODHUMBXAI9H7ft9cWrd0fBQjxhoj8pvgJXEZ52flXSqQc7qHLg1wO/zC\n'
'RUgpTcNAb2qCssBKbj+c1vKEPRUJfw6UYb0s1462rQNc8BgZiKaNbwokFmkAnjUg\n'
'AvnX\n'
'-----END CERTIFICATE-----')
return (ca_key, private_key)
def mock_config(mocker, http_config=None, socks_config=None):
# Mock config
mock_config = config.PappyConfig()
mock_config.socks_proxy = socks_config
mock_config.http_proxy = http_config
mock_session = pappy.PappySession(mock_config)
mocker.patch.object(pappy, 'session', new=mock_session)
mocker.patch("pappyproxy.proxy.load_certs_from_dir", new=mock_generate_cert)

Loading…
Cancel
Save