You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
370 lines
14 KiB
370 lines
14 KiB
import mock |
|
import pytest |
|
import twisted.internet.endpoints |
|
|
|
from pappyproxy import http |
|
from pappyproxy.proxy import MaybeTLSProtocol, start_maybe_tls, PassthroughProtocolFactory, make_proxied_connection |
|
from testutil import mock_deferred, func_deleted, TLSStringTransport, freeze, mock_int_macro, no_tcp, mock_config |
|
from pappyproxy.util import PappyStringTransport |
|
from twisted.internet import defer, ssl |
|
|
|
############################### |
|
## Helper functions and classes |
|
|
|
def gen_debug_protocol(mocker): |
|
def fake_start_tls(transport, context): |
|
transport.protocol = mock.MagicMock() |
|
mocker.patch("pappyproxy.util.PappyStringTransport.startTLS", new=fake_start_tls) |
|
mocker.patch("pappyproxy.proxy.generate_tls_context") |
|
|
|
t = PappyStringTransport() |
|
def server_data_received(s): |
|
t.write(s) |
|
factory = PassthroughProtocolFactory(server_data_received, None, None) |
|
p = factory.buildProtocol(None) |
|
p.transport = t |
|
t.protocol= p |
|
return p, t |
|
|
|
def mock_protocol_proxy(mocker): |
|
from pappyproxy import proxy |
|
mock_make_proxied_connection = mocker.patch("pappyproxy.proxy.make_proxied_connection") |
|
p = proxy.ProtocolProxy() |
|
p.client_transport = PappyStringTransport() |
|
p.server_transport = PappyStringTransport() |
|
|
|
client_protocol, _ = gen_debug_protocol(mocker) |
|
server_protocol, _ = gen_debug_protocol(mocker) |
|
|
|
return p, client_protocol, server_protocol, mock_make_proxied_connection |
|
|
|
########################## |
|
## Tests for ProtocolProxy |
|
|
|
def test_simple(mocker): |
|
mock_config(mocker) |
|
proxy, _, _, _ = mock_protocol_proxy(mocker) |
|
proxy.send_client_data("foobar") |
|
assert proxy.client_buffer == "foobar" |
|
proxy.send_server_data("barfoo") |
|
assert proxy.server_buffer == "barfoo" |
|
|
|
def test_connect(mocker): |
|
mock_config(mocker) |
|
proxy, _, _, mock_make_proxied_connection = mock_protocol_proxy(mocker) |
|
proxy.send_client_data("foobar") |
|
proxy.send_server_data("barfoo") |
|
proxy.connect("fakehost", 1337, False) |
|
assert len(mock_make_proxied_connection.mock_calls) == 1 |
|
callargs = mock_make_proxied_connection.mock_calls[0][1] |
|
assert callargs[1] == "fakehost" |
|
assert callargs[2] == 1337 |
|
assert callargs[3] == False |
|
|
|
def test_send_before_connect(mocker): |
|
mock_config(mocker) |
|
proxy, client_protocol, server_protocol, mock_make_proxied_connection = mock_protocol_proxy(mocker) |
|
proxy.send_client_data("foobar") |
|
proxy.send_server_data("barfoo") |
|
|
|
proxy.connect("fakehost", 1337, False) |
|
|
|
proxy.client_connection_made(client_protocol) |
|
assert proxy.client_buffer == '' |
|
assert client_protocol.transport.pop_value() == 'foobar' |
|
|
|
assert proxy.server_buffer == 'barfoo' |
|
|
|
proxy.server_connection_made(server_protocol) |
|
assert proxy.server_buffer == '' |
|
assert server_protocol.transport.pop_value() == 'barfoo' |
|
|
|
def test_send_after_connect(mocker): |
|
mock_config(mocker) |
|
proxy, client_protocol, server_protocol, mock_make_proxied_connection = mock_protocol_proxy(mocker) |
|
|
|
proxy.connect("fakehost", 1337, False) |
|
|
|
proxy.client_connection_made(client_protocol) |
|
proxy.send_client_data("foobar") |
|
assert client_protocol.transport.pop_value() == 'foobar' |
|
|
|
proxy.server_connection_made(server_protocol) |
|
proxy.send_server_data("barfoo") |
|
assert server_protocol.transport.pop_value() == 'barfoo' |
|
|
|
def test_start_tls_before_connect(mocker): |
|
mock_config(mocker) |
|
proxy, client_protocol, server_protocol, mock_make_proxied_connection = mock_protocol_proxy(mocker) |
|
client_protocol.transport.startTLS = mock.MagicMock() |
|
server_protocol.transport.startTLS = mock.MagicMock() |
|
|
|
server_protocol.transport.startTLS.assert_not_called() |
|
client_protocol.transport.startTLS.assert_not_called() |
|
proxy.start_server_tls() |
|
proxy.start_client_tls("fakehost") |
|
|
|
server_protocol.transport.startTLS.assert_not_called() |
|
client_protocol.transport.startTLS.assert_not_called() |
|
proxy.connect("fakehost", 1337, False) |
|
server_protocol.transport.startTLS.assert_not_called() |
|
client_protocol.transport.startTLS.assert_not_called() |
|
|
|
proxy.server_connection_made(server_protocol) |
|
assert len(server_protocol.transport.startTLS.mock_calls) == 1 |
|
client_protocol.transport.startTLS.assert_not_called() |
|
|
|
proxy.client_connection_made(client_protocol) |
|
assert len(client_protocol.transport.startTLS.mock_calls) == 1 |
|
|
|
def test_start_tls_after_connect(mocker): |
|
mock_config(mocker) |
|
proxy, client_protocol, server_protocol, mock_make_proxied_connection = mock_protocol_proxy(mocker) |
|
client_protocol.transport.startTLS = mock.MagicMock() |
|
server_protocol.transport.startTLS = mock.MagicMock() |
|
|
|
server_protocol.transport.startTLS.assert_not_called() |
|
client_protocol.transport.startTLS.assert_not_called() |
|
|
|
proxy.connect("fakehost", 1337, False) |
|
|
|
server_protocol.transport.startTLS.assert_not_called() |
|
client_protocol.transport.startTLS.assert_not_called() |
|
|
|
proxy.server_connection_made(server_protocol) |
|
proxy.start_server_tls() |
|
assert len(server_protocol.transport.startTLS.mock_calls) == 1 |
|
|
|
proxy.client_connection_made(client_protocol) |
|
proxy.start_client_tls("fakehost") |
|
assert len(client_protocol.transport.startTLS.mock_calls) == 1 |
|
|
|
def test_start_maybe_tls_before_connect(mocker): |
|
mock_config(mocker) |
|
proxy, client_protocol, server_protocol, mock_make_proxied_connection = mock_protocol_proxy(mocker) |
|
mock_maybe_tls = mocker.patch("pappyproxy.proxy.start_maybe_tls") |
|
client_protocol.transport.startTLS = mock.MagicMock() |
|
server_protocol.transport.startTLS = mock.MagicMock() |
|
|
|
proxy.start_client_maybe_tls("fakehost") |
|
|
|
client_protocol.transport.startTLS.assert_not_called() |
|
proxy.connect("fakehost", 1337, False) |
|
client_protocol.transport.startTLS.assert_not_called() |
|
|
|
proxy.client_connection_made(client_protocol) |
|
client_protocol.transport.startTLS.assert_not_called() |
|
assert len(mock_maybe_tls.mock_calls) == 1 |
|
|
|
callargs = mock_maybe_tls.mock_calls[0][2] |
|
assert callargs['tls_host'] == 'fakehost' |
|
assert callargs['start_tls_callback'] == proxy.start_server_tls |
|
|
|
def test_start_maybe_tls_before_connect(mocker): |
|
mock_config(mocker) |
|
proxy, client_protocol, server_protocol, mock_make_proxied_connection = mock_protocol_proxy(mocker) |
|
mock_maybe_tls = mocker.patch("pappyproxy.proxy.start_maybe_tls") |
|
client_protocol.transport.startTLS = mock.MagicMock() |
|
server_protocol.transport.startTLS = mock.MagicMock() |
|
|
|
|
|
client_protocol.transport.startTLS.assert_not_called() |
|
proxy.connect("fakehost", 1337, False) |
|
client_protocol.transport.startTLS.assert_not_called() |
|
|
|
proxy.client_connection_made(client_protocol) |
|
proxy.start_client_maybe_tls("fakehost") |
|
client_protocol.transport.startTLS.assert_not_called() |
|
assert len(mock_maybe_tls.mock_calls) == 1 |
|
|
|
callargs = mock_maybe_tls.mock_calls[0][2] |
|
assert callargs['tls_host'] == 'fakehost' |
|
assert callargs['start_tls_callback'] == proxy.start_server_tls |
|
|
|
|
|
############################# |
|
## Tests for MaybeTLSProtocol |
|
def test_maybe_tls_plaintext(mocker): |
|
mock_config(mocker) |
|
tls_callback = mock.MagicMock() |
|
p, t = gen_debug_protocol(mocker) |
|
start_maybe_tls(p.transport, 'www.foo.faketld') |
|
p.dataReceived("Hello world!") |
|
assert p.transport.pop_value() == "Hello world!" |
|
|
|
def test_maybe_tls_use_tls(mocker): |
|
mock_config(mocker) |
|
tls_callback = mock.MagicMock() |
|
p, t = gen_debug_protocol(mocker) |
|
start_maybe_tls(p.transport, 'www.foo.faketld') |
|
maybe_tls_prot = t.protocol |
|
assert isinstance(maybe_tls_prot, MaybeTLSProtocol) |
|
assert maybe_tls_prot.state == MaybeTLSProtocol.STATE_DECIDING |
|
t.protocol.dataReceived("\x16") |
|
assert not isinstance(t.protocol, MaybeTLSProtocol) |
|
assert maybe_tls_prot.state == MaybeTLSProtocol.STATE_PASSTHROUGH |
|
|
|
#################################### |
|
## Tests for make_proxied_connection |
|
def test_mpc_simple(mocker): |
|
mock_config(mocker) |
|
endpoint_instance = mock.MagicMock() |
|
endpoint = mocker.patch('twisted.internet.endpoints.TCP4ClientEndpoint', return_value=endpoint_instance) |
|
|
|
make_proxied_connection('fakefactory', 'fakehost', 1337, False) |
|
|
|
endpointcalls = endpoint.mock_calls[0] |
|
assert endpointcalls[1][1] == 'fakehost' |
|
assert endpointcalls[1][2] == 1337 |
|
|
|
connectcall = endpoint_instance.connect |
|
assert len(connectcall.mock_calls) == 1 |
|
|
|
def test_mpc_ssl(mocker): |
|
mock_config(mocker) |
|
endpoint_instance = mock.MagicMock() |
|
endpoint = mocker.patch('twisted.internet.endpoints.SSL4ClientEndpoint', return_value=endpoint_instance) |
|
|
|
make_proxied_connection('fakefactory', 'fakehost', 1337, True) |
|
|
|
endpointcalls = endpoint.mock_calls[0] |
|
assert endpointcalls[1][1] == 'fakehost' |
|
assert endpointcalls[1][2] == 1337 |
|
assert isinstance(endpointcalls[1][3], ssl.ClientContextFactory) |
|
|
|
connectcall = endpoint_instance.connect |
|
assert len(connectcall.mock_calls) == 1 |
|
|
|
def test_mpc_socks(mocker): |
|
mock_config(mocker) |
|
tcp_endpoint_instance = mock.MagicMock() |
|
socks_endpoint_instance = mock.MagicMock() |
|
tcp_endpoint = mocker.patch('twisted.internet.endpoints.TCP4ClientEndpoint', return_value=tcp_endpoint_instance) |
|
socks_endpoint = mocker.patch('txsocksx.client.SOCKS5ClientEndpoint', return_value=socks_endpoint_instance) |
|
|
|
target_host = 'fakehost' |
|
target_port = 1337 |
|
|
|
socks_host = 'fakesockshost' |
|
socks_port = 1447 |
|
socks_config = {'host':socks_host, 'port':socks_port} |
|
|
|
make_proxied_connection('fakefactory', target_host, target_port, False, socks_config=socks_config) |
|
|
|
tcp_ep_calls = tcp_endpoint.mock_calls[0] |
|
assert tcp_ep_calls[1][1] == socks_host |
|
assert tcp_ep_calls[1][2] == socks_port |
|
|
|
socks_ep_calls = socks_endpoint.mock_calls[0] |
|
assert socks_ep_calls[1][0] == target_host |
|
assert socks_ep_calls[1][1] == target_port |
|
assert socks_ep_calls[1][2] is tcp_endpoint_instance |
|
assert socks_ep_calls[2]['methods'] == {'anonymous': ()} |
|
|
|
connectcall = socks_endpoint_instance.connect |
|
assert len(connectcall.mock_calls) == 1 |
|
|
|
def test_mpc_socks_creds(mocker): |
|
mock_config(mocker) |
|
tcp_endpoint_instance = mock.MagicMock() |
|
socks_endpoint_instance = mock.MagicMock() |
|
tcp_endpoint = mocker.patch('twisted.internet.endpoints.TCP4ClientEndpoint', return_value=tcp_endpoint_instance) |
|
socks_endpoint = mocker.patch('txsocksx.client.SOCKS5ClientEndpoint', return_value=socks_endpoint_instance) |
|
|
|
target_host = 'fakehost' |
|
target_port = 1337 |
|
|
|
socks_host = 'fakesockshost' |
|
socks_port = 1447 |
|
socks_user = 'username' |
|
socks_password = 'password' |
|
socks_config = {'host':socks_host, 'port':socks_port, |
|
'username':socks_user, 'password':socks_password} |
|
|
|
make_proxied_connection('fakefactory', target_host, target_port, False, socks_config=socks_config) |
|
|
|
tcp_ep_calls = tcp_endpoint.mock_calls[0] |
|
assert tcp_ep_calls[1][1] == socks_host |
|
assert tcp_ep_calls[1][2] == socks_port |
|
|
|
socks_ep_calls = socks_endpoint.mock_calls[0] |
|
assert socks_ep_calls[1][0] == target_host |
|
assert socks_ep_calls[1][1] == target_port |
|
assert socks_ep_calls[1][2] is tcp_endpoint_instance |
|
assert socks_ep_calls[2]['methods'] == {'login': (socks_user, socks_password), 'anonymous': ()} |
|
|
|
connectcall = socks_endpoint_instance.connect |
|
assert len(connectcall.mock_calls) == 1 |
|
|
|
def test_mpc_socks_ssl(mocker): |
|
mock_config(mocker) |
|
tcp_endpoint_instance = mock.MagicMock() |
|
socks_endpoint_instance = mock.MagicMock() |
|
wrapper_instance = mock.MagicMock() |
|
tcp_endpoint = mocker.patch('twisted.internet.endpoints.TCP4ClientEndpoint', return_value=tcp_endpoint_instance) |
|
socks_endpoint = mocker.patch('txsocksx.client.SOCKS5ClientEndpoint', return_value=socks_endpoint_instance) |
|
wrapper_endpoint = mocker.patch('txsocksx.tls.TLSWrapClientEndpoint', return_value=wrapper_instance) |
|
|
|
target_host = 'fakehost' |
|
target_port = 1337 |
|
|
|
socks_host = 'fakesockshost' |
|
socks_port = 1447 |
|
socks_config = {'host':socks_host, 'port':socks_port} |
|
|
|
make_proxied_connection('fakefactory', target_host, target_port, True, socks_config=socks_config) |
|
|
|
tcp_ep_calls = tcp_endpoint.mock_calls[0] |
|
assert tcp_ep_calls[1][1] == socks_host |
|
assert tcp_ep_calls[1][2] == socks_port |
|
|
|
socks_ep_calls = socks_endpoint.mock_calls[0] |
|
assert socks_ep_calls[1][0] == target_host |
|
assert socks_ep_calls[1][1] == target_port |
|
assert socks_ep_calls[1][2] is tcp_endpoint_instance |
|
assert socks_ep_calls[2]['methods'] == {'anonymous': ()} |
|
|
|
wrapper_calls = wrapper_endpoint.mock_calls[0] |
|
assert isinstance(wrapper_calls[1][0], ssl.ClientContextFactory) |
|
assert wrapper_calls[1][1] is socks_endpoint_instance |
|
|
|
connectcall = wrapper_instance.connect |
|
assert len(connectcall.mock_calls) == 1 |
|
|
|
def test_mpc_socks_ssl_creds(mocker): |
|
mock_config(mocker) |
|
tcp_endpoint_instance = mock.MagicMock() |
|
socks_endpoint_instance = mock.MagicMock() |
|
wrapper_instance = mock.MagicMock() |
|
tcp_endpoint = mocker.patch('twisted.internet.endpoints.TCP4ClientEndpoint', return_value=tcp_endpoint_instance) |
|
socks_endpoint = mocker.patch('txsocksx.client.SOCKS5ClientEndpoint', return_value=socks_endpoint_instance) |
|
wrapper_endpoint = mocker.patch('txsocksx.tls.TLSWrapClientEndpoint', return_value=wrapper_instance) |
|
|
|
target_host = 'fakehost' |
|
target_port = 1337 |
|
|
|
socks_host = 'fakesockshost' |
|
socks_port = 1447 |
|
socks_user = 'username' |
|
socks_password = 'password' |
|
socks_config = {'host':socks_host, 'port':socks_port, |
|
'username':socks_user, 'password':socks_password} |
|
|
|
make_proxied_connection('fakefactory', target_host, target_port, True, socks_config=socks_config) |
|
|
|
tcp_ep_calls = tcp_endpoint.mock_calls[0] |
|
assert tcp_ep_calls[1][1] == socks_host |
|
assert tcp_ep_calls[1][2] == socks_port |
|
|
|
socks_ep_calls = socks_endpoint.mock_calls[0] |
|
assert socks_ep_calls[1][0] == target_host |
|
assert socks_ep_calls[1][1] == target_port |
|
assert socks_ep_calls[1][2] is tcp_endpoint_instance |
|
assert socks_ep_calls[2]['methods'] == {'login': (socks_user, socks_password), 'anonymous': ()} |
|
|
|
wrapper_calls = wrapper_endpoint.mock_calls[0] |
|
assert isinstance(wrapper_calls[1][0], ssl.ClientContextFactory) |
|
assert wrapper_calls[1][1] is socks_endpoint_instance |
|
|
|
connectcall = wrapper_instance.connect |
|
assert len(connectcall.mock_calls) == 1
|
|
|