Initial release

This commit is contained in:
Rob Glew 2015-11-17 19:27:41 -06:00
parent 03fabf16e8
commit f6ebcd271b
33 changed files with 5028 additions and 0 deletions

View file

View file

@ -0,0 +1,402 @@
import pytest
import context
from http import Request, Response, ResponseCookie
@pytest.fixture
def http_request():
return Request('GET / HTTP/1.1\r\n')
def test_filter_reqs():
pass
def test_gen_filter_by_all_request():
f = context.gen_filter_by_all(context.cmp_contains, 'hello')
fn = context.gen_filter_by_all(context.cmp_contains, 'hello', negate=True)
# Nowhere
r = Request('GET / HTTP/1.1\r\n')
assert not f(r)
assert fn(r)
# Verb
r = Request('hello / HTTP/1.1\r\n')
assert f(r)
assert not fn(r)
# Path
r = Request('GET /hello HTTP/1.1\r\n')
assert f(r)
assert not fn(r)
# Data
r = Request('GET / HTTP/1.1\r\n')
r.raw_data = 'hello'
assert f(r)
assert not fn(r)
# Header key
r = Request('GET / HTTP/1.1\r\n')
r.headers['hello'] = 'goodbye'
assert f(r)
assert not fn(r)
# Header value
r = Request('GET / HTTP/1.1\r\n')
r.headers['goodbye'] = 'hello'
assert f(r)
assert not fn(r)
# Nowhere in headers
r = Request('GET / HTTP/1.1\r\n')
r.headers['goodbye'] = 'for real'
assert not f(r)
assert fn(r)
# Cookie key
r = Request('GET / HTTP/1.1\r\n')
r.cookies['hello'] = 'world'
r.update_from_objects()
assert f(r)
assert not fn(r)
# Cookie value
r = Request('GET / HTTP/1.1\r\n')
r.cookies['world'] = 'hello'
r.update_from_objects()
assert f(r)
assert not fn(r)
# Nowhere in cookie
r = Request('GET / HTTP/1.1\r\n')
r.cookies['world'] = 'sucks'
r.update_from_objects()
assert not f(r)
assert fn(r)
def test_gen_filter_by_all_response(http_request):
f = context.gen_filter_by_all(context.cmp_contains, 'hello')
fn = context.gen_filter_by_all(context.cmp_contains, 'hello', negate=True)
# Nowhere
r = Response('HTTP/1.1 200 OK\r\n')
http_request.response = r
assert not f(http_request)
assert fn(http_request)
# Response text
r = Response('HTTP/1.1 200 hello\r\n')
http_request.response = r
assert f(http_request)
assert not fn(http_request)
# Data
r = Response('HTTP/1.1 200 OK\r\n')
http_request.response = r
r.raw_data = 'hello'
assert f(http_request)
assert not fn(http_request)
# Header key
r = Response('HTTP/1.1 200 OK\r\n')
http_request.response = r
r.headers['hello'] = 'goodbye'
assert f(http_request)
assert not fn(http_request)
# Header value
r = Response('HTTP/1.1 200 OK\r\n')
http_request.response = r
r.headers['goodbye'] = 'hello'
assert f(http_request)
assert not fn(http_request)
# Nowhere in headers
r = Response('HTTP/1.1 200 OK\r\n')
http_request.response = r
r.headers['goodbye'] = 'for real'
assert not f(http_request)
assert fn(http_request)
# Cookie key
r = Response('HTTP/1.1 200 OK\r\n')
http_request.response = r
r.add_cookie(ResponseCookie('hello=goodbye'))
r.update_from_objects()
assert f(http_request)
assert not fn(http_request)
# Cookie value
r = Response('HTTP/1.1 200 OK\r\n')
http_request.response = r
r.add_cookie(ResponseCookie('goodbye=hello'))
r.update_from_objects()
assert f(http_request)
assert not fn(http_request)
# Nowhere in cookie
r = Response('HTTP/1.1 200 OK\r\n')
http_request.response = r
r.add_cookie(ResponseCookie('goodbye=for real'))
r.update_from_objects()
assert not f(http_request)
assert fn(http_request)
def test_filter_by_host(http_request):
f = context.gen_filter_by_host(context.cmp_contains, 'sexy')
fn = context.gen_filter_by_host(context.cmp_contains, 'sexy', negate=True)
http_request.headers['Host'] = 'google.com'
http_request.headers['MiscHeader'] = 'vim.sexy'
assert not f(http_request)
assert fn(http_request)
http_request.headers['Host'] = 'vim.sexy'
http_request.update_from_text()
assert http_request.host == 'vim.sexy'
assert f(http_request)
assert not fn(http_request)
def test_filter_by_body():
f = context.gen_filter_by_body(context.cmp_contains, 'sexy')
fn = context.gen_filter_by_body(context.cmp_contains, 'sexy', negate=True)
# Test request bodies
r = Request()
r.status_line = 'GET /sexy HTTP/1.1'
r.headers['Header'] = 'sexy'
r.raw_data = 'foo'
assert not f(r)
assert fn(r)
r.raw_data = 'sexy'
assert f(r)
assert not fn(r)
# Test response bodies
r = Request()
rsp = Response()
rsp.status_line = 'HTTP/1.1 200 OK'
rsp.headers['sexy'] = 'sexy'
r.status_line = 'GET /sexy HTTP/1.1'
r.headers['Header'] = 'sexy'
r.response = rsp
assert not f(r)
assert fn(r)
rsp.raw_data = 'sexy'
assert f(r)
assert not fn(r)
def test_filter_by_response_code(http_request):
f = context.gen_filter_by_response_code(context.cmp_eq, 200)
fn = context.gen_filter_by_response_code(context.cmp_eq, 200, negate=True)
r = Response()
http_request.response = r
r.status_line = 'HTTP/1.1 404 Not Found'
assert not f(http_request)
assert fn(http_request)
r.status_line = 'HTTP/1.1 200 OK'
assert f(http_request)
assert not fn(http_request)
def test_filter_by_raw_headers_request():
f1 = context.gen_filter_by_raw_headers(context.cmp_contains, 'Sexy:')
fn1 = context.gen_filter_by_raw_headers(context.cmp_contains, 'Sexy:', negate=True)
f2 = context.gen_filter_by_raw_headers(context.cmp_contains, 'sexy\r\nHeader')
fn2 = context.gen_filter_by_raw_headers(context.cmp_contains, 'sexy\r\nHeader', negate=True)
r = Request('GET / HTTP/1.1\r\n')
rsp = Response('HTTP/1.1 200 OK\r\n')
r.response = rsp
r.headers['Header'] = 'Sexy'
assert not f1(r)
assert fn1(r)
assert not f2(r)
assert fn2(r)
r = Request('GET / HTTP/1.1\r\n')
rsp = Response('HTTP/1.1 200 OK\r\n')
r.response = rsp
r.headers['Sexy'] = 'sexy'
assert f1(r)
assert not fn1(r)
assert not f2(r)
assert fn2(r)
r.headers['OtherHeader'] = 'sexy'
r.headers['Header'] = 'foo'
assert f1(r)
assert not fn1(r)
assert f2(r)
assert not fn2(r)
def test_filter_by_raw_headers_response():
f1 = context.gen_filter_by_raw_headers(context.cmp_contains, 'Sexy:')
fn1 = context.gen_filter_by_raw_headers(context.cmp_contains, 'Sexy:', negate=True)
f2 = context.gen_filter_by_raw_headers(context.cmp_contains, 'sexy\r\nHeader')
fn2 = context.gen_filter_by_raw_headers(context.cmp_contains, 'sexy\r\nHeader', negate=True)
r = Request('GET / HTTP/1.1\r\n')
rsp = Response('HTTP/1.1 200 OK\r\n')
r.response = rsp
rsp.headers['Header'] = 'Sexy'
assert not f1(r)
assert fn1(r)
assert not f2(r)
assert fn2(r)
r = Request('GET / HTTP/1.1\r\n')
rsp = Response('HTTP/1.1 200 OK\r\n')
r.response = rsp
rsp.headers['Sexy'] = 'sexy'
assert f1(r)
assert not fn1(r)
assert not f2(r)
assert fn2(r)
rsp.headers['OtherHeader'] = 'sexy'
rsp.headers['Header'] = 'foo'
assert f1(r)
assert not fn1(r)
assert f2(r)
assert not fn2(r)
def test_filter_by_path(http_request):
f = context.gen_filter_by_path(context.cmp_contains, 'porn') # find the fun websites
fn = context.gen_filter_by_path(context.cmp_contains, 'porn', negate=True) # find the boring websites
http_request.status_line = 'GET / HTTP/1.1'
assert not f(http_request)
assert fn(http_request)
http_request.status_line = 'GET /path/to/great/porn HTTP/1.1'
assert f(http_request)
assert not fn(http_request)
http_request.status_line = 'GET /path/to/porn/great HTTP/1.1'
assert f(http_request)
assert not fn(http_request)
def test_gen_filter_by_submitted_cookies():
f1 = context.gen_filter_by_submitted_cookies(context.cmp_contains, 'Session')
f2 = context.gen_filter_by_submitted_cookies(context.cmp_contains, 'Cookie',
context.cmp_contains, 'CookieVal')
r = Request(('GET / HTTP/1.1\r\n'
'Cookie: foo=bar\r\n'
'\r\n'))
assert not f1(r)
assert not f2(r)
r = Request(('GET / HTTP/1.1\r\n'
'Cookie: Session=bar\r\n'
'\r\n'))
assert f1(r)
assert not f2(r)
r = Request(('GET / HTTP/1.1\r\n'
'Cookie: Session=bar; CookieThing=NoMatch\r\n'
'\r\n'))
assert f1(r)
assert not f2(r)
r = Request(('GET / HTTP/1.1\r\n'
'Cookie: Session=bar; CookieThing=CookieValue\r\n'
'\r\n'))
assert f1(r)
assert f2(r)
def test_gen_filter_by_set_cookies():
f1 = context.gen_filter_by_set_cookies(context.cmp_contains, 'Session')
f2 = context.gen_filter_by_set_cookies(context.cmp_contains, 'Cookie',
context.cmp_contains, 'CookieVal')
r = Request('GET / HTTP/1.1\r\n\r\n')
rsp = Response(('HTTP/1.1 200 OK\r\n'
'Set-Cookie: foo=bar\r\n'
'\r\n'))
r.response = rsp
assert not f1(r)
assert not f2(r)
r = Request('GET / HTTP/1.1\r\n\r\n')
rsp = Response(('HTTP/1.1 200 OK\r\n'
'Set-Cookie: foo=bar\r\n'
'Set-Cookie: Session=Banana\r\n'
'\r\n'))
r.response = rsp
assert f1(r)
assert not f2(r)
r = Request('GET / HTTP/1.1\r\n\r\n')
rsp = Response(('HTTP/1.1 200 OK\r\n'
'Set-Cookie: foo=bar\r\n'
'Set-Cookie: Session=Banana\r\n'
'Set-Cookie: CookieThing=NoMatch\r\n'
'\r\n'))
r.response = rsp
assert f1(r)
assert not f2(r)
r = Request('GET / HTTP/1.1\r\n\r\n')
rsp = Response(('HTTP/1.1 200 OK\r\n'
'Set-Cookie: foo=bar\r\n'
'Set-Cookie: Session=Banana\r\n'
'Set-Cookie: CookieThing=CookieValue\r\n'
'\r\n'))
r.response = rsp
assert f1(r)
assert f2(r)
def test_filter_by_params_get():
f1 = context.gen_filter_by_params(context.cmp_contains, 'Session')
f2 = context.gen_filter_by_params(context.cmp_contains, 'Cookie',
context.cmp_contains, 'CookieVal')
r = Request('GET / HTTP/1.1\r\n\r\n')
assert not f1(r)
assert not f2(r)
r = Request('GET /?Session=foo HTTP/1.1\r\n\r\n')
assert f1(r)
assert not f2(r)
r = Request('GET /?Session=foo&CookieThing=Fail HTTP/1.1\r\n\r\n')
assert f1(r)
assert not f2(r)
r = Request('GET /?Session=foo&CookieThing=CookieValue HTTP/1.1\r\n\r\n')
assert f1(r)
assert f2(r)
def test_filter_by_params_post():
f1 = context.gen_filter_by_params(context.cmp_contains, 'Session')
f2 = context.gen_filter_by_params(context.cmp_contains, 'Cookie',
context.cmp_contains, 'CookieVal')
r = Request(('GET / HTTP/1.1\r\n'
'Content-Type: application/x-www-form-urlencoded\r\n\r\n'))
r.raw_data = 'foo=bar'
assert not f1(r)
assert not f2(r)
r = Request(('GET / HTTP/1.1\r\n'
'Content-Type: application/x-www-form-urlencoded\r\n\r\n'))
r.raw_data = 'Session=bar'
assert f1(r)
assert not f2(r)
r = Request(('GET / HTTP/1.1\r\n'
'Content-Type: application/x-www-form-urlencoded\r\n\r\n'))
r.raw_data = 'Session=bar&Cookie=foo'
assert f1(r)
assert not f2(r)
r = Request(('GET / HTTP/1.1\r\n'
'Content-Type: application/x-www-form-urlencoded\r\n\r\n'))
r.raw_data = 'Session=bar&CookieThing=CookieValue'
assert f1(r)
assert f2(r)

View file

@ -0,0 +1,994 @@
import base64
import gzip
import json
import pytest
import StringIO
import zlib
from pappy import http
####################
# Helper Functions
class TException(Exception):
pass
def by_lines_and_full_helper(Type, id_attr, load_func, header_lines, data=''):
# Creates a request/response and returns versions created/recreated in
# different ways. All of them should be equivalent.
# Returned:
# (created with constructor,
# created with add_line and add_data
# after calling update() on it,
# created by serializing and unserializing to json)
t_lines = Type()
for l in header_lines:
t_lines.add_line(l)
if data:
t_lines.add_data(data)
t_fulls = '\r\n'.join(header_lines)+'\r\n'
t_fulls += data
t_full = Type(t_fulls)
t_updated = Type(t_fulls)
t_json = Type(t_fulls)
t_json.from_json(t_json.to_json())
return (t_full, t_lines, t_updated, t_json)
def req_by_lines_and_full(header_lines, data=''):
# Generates r_full, r_lines using the given header lines and data
# r_lines is created with add_line/add_data and r_full is created with
# the constructor
return by_lines_and_full_helper(http.Request, 'reqid',
http.Request.load_request,
header_lines, data)
def rsp_by_lines_and_full(header_lines, data=''):
# Generates r_full, r_lines using the given header lines and data
# r_lines is created with add_line/add_data and r_full is created with
# the constructor
return by_lines_and_full_helper(http.Response, 'rspid',
http.Response.load_response,
header_lines, data)
def gzip_string(string):
out = StringIO.StringIO()
with gzip.GzipFile(fileobj=out, mode="w") as f:
f.write(string)
return out.getvalue()
def deflate_string(string):
return StringIO.StringIO(zlib.compress(string)).read()
def check_response_cookies(exp_pairs, rsp):
pairs = rsp.cookies.all_pairs()
pairs = [(c.key, c.val) for k, c in pairs]
assert pairs == exp_pairs
####################
# Data storage
def test_chunked_simple():
# Test a simple add_data
c = http.ChunkedData()
assert (not c.complete)
full_data = '5\r\n'
full_data += 'A'*5
full_data += '\r\n'
full_data += '0\r\n\r\n'
c.add_data(full_data)
assert c.complete
assert c.raw_data == 'A'*5
def test_chunked_hex():
# Test hex lengths
c = http.ChunkedData()
full_data = 'af\r\n'
full_data += 'A'*0xAF
full_data += '\r\n'
full_data += '0\r\n\r\n'
c.add_data(full_data)
assert c.complete
assert c.raw_data == 'A'*0xAF
c = http.ChunkedData()
full_data = 'AF\r\n'
full_data += 'A'*0xAF
full_data += '\r\n'
full_data += '0\r\n\r\n'
c.add_data(full_data)
assert c.complete
assert c.raw_data == 'A'*0xAF
c = http.ChunkedData()
full_data = 'aF\r\n'
full_data += 'A'*0xAF
full_data += '\r\n'
full_data += '0\r\n\r\n'
c.add_data(full_data)
assert c.complete
assert c.raw_data == 'A'*0xAF
def test_chunked_leading_zeros():
# Test leading zeros
c = http.ChunkedData()
full_data = '000000000000000aF\r\n'
full_data += 'A'*0xAF
full_data += '\r\n'
full_data += '0\r\n\r\n'
c.add_data(full_data)
assert c.complete
assert c.raw_data == 'A'*0xAF
def test_chunked_one_char_add():
# Test adding one character at a time
c = http.ChunkedData()
full_data = 'af\r\n'
full_data += 'A'*0xAF
full_data += '\r\n'
full_data += '0\r\n\r\n'
for ch in full_data:
c.add_data(ch)
assert c.complete
assert c.raw_data == 'A'*0xAF
def test_chunked_incomplete():
# Tests that complete isn't true until the data is received
full_data = 'af\r\n'
full_data += 'A'*0xAF
full_data += '\r\n'
full_data += '0' # right now we're fine ending on 0 without \r\n
for i in range(len(full_data)-1):
c = http.ChunkedData()
c.add_data(full_data[:i])
assert not c.complete
# Test incomplete one character at a time
full_data = 'af\r\n'
full_data += 'A'*0xAF
full_data += '\r\n'
full_data += '0' # right now we're fine ending on 0 without \r\n
for i in range(len(full_data)-1):
c = http.ChunkedData()
for ii in range(i):
c.add_data(full_data[ii])
assert not c.complete
def test_length_data_simple():
# Basic test
l = http.LengthData(100)
assert not l.complete
l.add_data('A'*100)
assert l.complete
assert l.raw_data == 'A'*100
l = http.LengthData(0)
assert l.complete
assert l.raw_data == ''
# Test incomplete
l = http.LengthData(100)
l.add_data('A'*99)
assert not l.complete
def test_length_one_character():
# Test adding one character at a time
l = http.LengthData(100)
for i in range(100):
l.add_data('A')
assert l.complete
assert l.raw_data == 'A'*100
# Test adding one character at a time (incomplete)
l = http.LengthData(100)
for i in range(99):
l.add_data('A')
assert not l.complete
def test_length_overflow():
# Test only saving the given number of chars
l = http.LengthData(100)
l.add_data('A'*400)
assert l.complete
assert l.raw_data == 'A'*100
# Test throwing an exception when adding data after complete
l = http.LengthData(100)
l.add_data('A'*100)
with pytest.raises(http.DataAlreadyComplete):
l.add_data('A')
def test_repeatable_dict_simple():
d = http.RepeatableDict()
assert not 'foo' in d
d['foo'] = 'bar'
assert 'foo' in d
d['baz'] = 'fuzz'
d.append('foo', 'fizz')
assert d['foo'] == 'fizz'
assert d['baz'] == 'fuzz'
assert d.all_vals('foo') == ['bar', 'fizz']
assert d.all_pairs() == [('foo', 'bar'),
('baz', 'fuzz'),
('foo', 'fizz')]
assert not 'fee' in d
d.add_pairs([('fee', 'fi'),
('foo', 'fo')])
assert 'fee' in d
assert d['fee'] == 'fi'
assert d['baz'] == 'fuzz'
assert d['foo'] == 'fo'
assert d.all_vals('foo') == ['bar', 'fizz', 'fo']
assert d.all_pairs() == [('foo', 'bar'),
('baz', 'fuzz'),
('foo', 'fizz'),
('fee', 'fi'),
('foo', 'fo')]
def test_repeatable_dict_constructor():
d = http.RepeatableDict([('foo','bar'),('baz','fuzz')])
assert 'foo' in d
assert d['foo'] == 'bar'
assert d['baz'] == 'fuzz'
assert d.all_vals('foo') == ['bar']
assert d.all_pairs() == [('foo', 'bar'),
('baz', 'fuzz')]
def test_repeatable_dict_case_insensitive():
def test(d):
assert 'foo' in d
assert 'fOo' in d
assert d['foo'] == 'fuzz'
assert d['Foo'] == 'fuzz'
assert d['FoO'] == 'fuzz'
assert d.all_vals('foo') == ['bar', 'fuzz']
assert d.all_vals('Foo') == ['bar', 'fuzz']
assert d.all_vals('FoO') == ['bar', 'fuzz']
assert d.all_pairs() == [('foo', 'bar'),
('fOo', 'fuzz')]
d = http.RepeatableDict([('foo','bar'),('fOo','fuzz')], case_insensitive=True)
test(d)
d = http.RepeatableDict(case_insensitive=True)
d['foo'] = 'bar'
d.append('fOo', 'fuzz')
test(d)
d = http.RepeatableDict(case_insensitive=True)
d.add_pairs([('foo','bar'),('fOo','fuzz')])
test(d)
def test_repeatable_dict_overwrite():
d = http.RepeatableDict([('foo','bar'),('foo','fuzz'),('bar','baz')])
d['foo'] = 'asdf'
assert d.all_vals('foo') == ['asdf']
def test_repeatable_dict_deletion():
d = http.RepeatableDict([('foo','bar'),('fOo','fuzz'),('bar','baz')],
case_insensitive=True)
assert 'foo' in d
del d['foo']
assert not 'foo' in d
with pytest.raises(KeyError):
x = d['foo']
with pytest.raises(KeyError):
x = d['fOo']
assert d['bar'] == 'baz'
assert d.all_vals('foo') == []
def test_repeatable_dict_callback():
def f():
raise TException()
r = http.RepeatableDict()
r['a'] = 'b'
r.add_pairs([('c', 'd')])
r.update('a', 'c')
r.set_modify_callback(f)
with pytest.raises(TException):
r['a'] = 'b'
with pytest.raises(TException):
r.add_pairs([('c', 'd')])
with pytest.raises(TException):
r.update('a', 'c')
####################
## Cookies
def test_response_cookie_simple():
s = 'ck=1234;'
c = http.ResponseCookie(s)
assert c.key == 'ck'
assert c.val == '1234'
assert not c.secure
assert not c.http_only
assert c.domain is None
assert c.expires is None
assert c.max_age is None
assert c.path is None
def test_response_cookie_params():
s = 'ck=1234; Expires=Wed, 09 Jun 2021 10:18:14 GMT; secure; httponly; path=/; max-age=12; domain=.foo.bar'
c = http.ResponseCookie(s)
assert c.key == 'ck'
assert c.val == '1234'
assert c.domain == '.foo.bar'
assert c.expires == 'Wed, 09 Jun 2021 10:18:14 GMT'
assert c.http_only
assert c.max_age == 12
assert c.path == '/'
assert c.secure
def test_response_cookie_parsing():
s = 'ck=1234=567;Expires=Wed, 09 Jun 2021 10:18:14 GMT;secure;httponly;path=/;max-age=12;domain=.foo.bar'
c = http.ResponseCookie(s)
assert c.key == 'ck'
assert c.val == '1234=567'
assert c.domain == '.foo.bar'
assert c.expires == 'Wed, 09 Jun 2021 10:18:14 GMT'
assert c.http_only
assert c.max_age == 12
assert c.path == '/'
assert c.secure
def test_response_cookie_generate():
pass
####################
## Request tests
def test_request_simple():
header_lines = [
'GET / HTTP/1.1',
'Content-Type: text/xml; charset="utf-8"',
'Accept-Encoding: gzip,deflate',
'User-Agent: TestAgent',
'Host: www.test.com',
'Content-Length: 100',
'Connection: Keep-Alive',
'Cache-Control: no-cache',
'',
]
headers = '\r\n'.join(header_lines)+'\r\n'
data = 'A'*100
rf, rl, ru, rj = req_by_lines_and_full(header_lines, data)
def test(r):
assert r.complete
assert r.fragment == None
assert r.full_request == headers+data
assert r.header_len == len(headers)
assert r.headers_complete
assert r.host == 'www.test.com'
assert r.is_ssl == False
assert r.path == '/'
assert r.port == 80
assert r.status_line == 'GET / HTTP/1.1'
assert r.verb == 'GET'
assert r.version == 'HTTP/1.1'
assert r.headers['Content-Length'] == '100'
assert r.headers['CoNtent-lENGTH'] == '100'
assert r.headers['Content-Type'] == 'text/xml; charset="utf-8"'
assert r.headers['Accept-Encoding'] == 'gzip,deflate'
assert r.headers['User-Agent'] == 'TestAgent'
assert r.headers['Host'] == 'www.test.com'
assert r.headers['Connection'] == 'Keep-Alive'
assert r.headers['Cache-Control'] == 'no-cache'
assert r.raw_data == 'A'*100
test(rf)
test(rl)
test(ru)
test(rj)
def test_request_urlparams():
header_lines = [
'GET /?p1=foo&p2=bar#frag HTTP/1.1',
'Content-Length: 0',
'',
]
rf, rl, ru, rj = req_by_lines_and_full(header_lines)
def test(r):
assert r.complete
assert r.fragment == 'frag'
assert r.get_params['p1'] == 'foo'
assert r.get_params['p2'] == 'bar'
assert r.full_request == ('GET /?p1=foo&p2=bar#frag HTTP/1.1\r\n'
'Content-Length: 0\r\n'
'\r\n')
test(rf)
test(rl)
test(ru)
test(rj)
def test_request_questionmark_url():
header_lines = [
'GET /path/??/to/?p1=foo&p2=bar#frag HTTP/1.1',
'Content-Length: 0',
'',
]
rf, rl, ru, rj = req_by_lines_and_full(header_lines)
def test(r):
assert r.complete
assert r.fragment == 'frag'
assert r.get_params['?/to/?p1'] == 'foo'
assert r.get_params['p2'] == 'bar'
assert r.full_request == ('GET /path/??/to/?p1=foo&p2=bar#frag HTTP/1.1\r\n'
'Content-Length: 0\r\n'
'\r\n')
test(rf)
test(rl)
test(ru)
test(rj)
def test_request_postparams():
header_lines = [
'GET / HTTP/1.1',
'Content-Length: 9',
'Content-Type: application/x-www-form-urlencoded',
'',
]
data = 'a=b&c=dee'
rf, rl, ru, rj = req_by_lines_and_full(header_lines, data)
def test(r):
assert r.complete
assert r.post_params['a'] == 'b'
assert r.post_params['c'] == 'dee'
test(rf)
test(rl)
test(ru)
test(rj)
def test_headers_end():
header_lines = [
'GET / HTTP/1.1',
'Content-Type: text/xml; charset="utf-8"',
'Accept-Encoding: gzip,deflate',
'User-Agent: TestAgent',
'Host: www.test.com',
'Content-Length: 100',
'Connection: Keep-Alive',
'Cache-Control: no-cache',
'',
]
r = http.Request()
for l in header_lines:
r.add_line(l)
assert not r.complete
assert r.headers_complete
def test_request_cookies():
header_lines = [
'GET /?p1=foo&p2=bar#frag HTTP/1.1',
'Content-Length: 0',
'Cookie: abc=WRONG; def=456; ghi=789; abc=123',
'',
]
rf, rl, ru, rj = req_by_lines_and_full(header_lines)
def test(r):
assert r.complete
assert r.cookies['abc'] == '123'
assert r.cookies['def'] == '456'
assert r.cookies['ghi'] == '789'
assert r.cookies.all_vals('abc') == ['WRONG', '123']
test(rf)
test(rl)
test(ru)
test(rj)
def test_request_parse_host():
header_lines = [
'GET / HTTP/1.1',
'Content-Length: 0',
'Host: www.test.com:443',
'',
]
rf, rl, ru, rj = req_by_lines_and_full(header_lines)
def test(r):
assert r.complete
assert r.host == 'www.test.com'
assert r.is_ssl
test(rf)
test(rl)
test(ru)
test(rj)
def test_request_newline_delim():
r = http.Request(('GET / HTTP/1.1\n'
'Test-Header: foo\r\n'
'Other-header: bar\n\r\n'))
assert r.full_request == ('GET / HTTP/1.1\r\n'
'Test-Header: foo\r\n'
'Other-header: bar\r\n\r\n')
def test_repeated_request_headers():
header_lines = [
'GET /?p1=foo&p2=bar#frag HTTP/1.1',
'Content-Length: 0',
'Test-Header: WRONG',
'Test-Header: RIGHTiguess',
'',
]
rf, rl, ru, rj = req_by_lines_and_full(header_lines)
def test(r):
assert r.complete
assert r.headers['test-header'] == 'RIGHTiguess'
test(rf)
test(rl)
test(ru)
test(rj)
def test_request_update_statusline():
r = http.Request()
r.status_line = 'GET / HTTP/1.1'
assert r.verb == 'GET'
assert r.path == '/'
assert r.version == 'HTTP/1.1'
assert not r.complete
assert r.full_request == 'GET / HTTP/1.1\r\n\r\n'
def test_request_update_cookies():
r = http.Request()
r.status_line = 'GET / HTTP/1.1'
# Check new cookies
r.cookies['foo'] = 'bar'
r.cookies['baz'] = 'fuzz'
assert r.full_request == ('GET / HTTP/1.1\r\n'
'Cookie: foo=bar; baz=fuzz\r\n'
'\r\n')
# Check updated cookies (should be updated in place)
r.cookies['foo'] = 'buzz'
assert r.full_request == ('GET / HTTP/1.1\r\n'
'Cookie: foo=buzz; baz=fuzz\r\n'
'\r\n')
# Check repeated cookies
r.cookies.append('foo', 'bar')
assert r.full_request == ('GET / HTTP/1.1\r\n'
'Cookie: foo=buzz; baz=fuzz; foo=bar\r\n'
'\r\n')
def test_request_update_headers():
r = http.Request()
r.status_line = 'GET / HTTP/1.1'
r.headers['Content-Length'] = '0'
r.headers['Test-Header'] = 'Test Value'
r.headers['Other-Header'] = 'Other Value'
r.headers['Host'] = 'www.test.com'
r.headers.append('Test-Header', 'Test Value2')
assert r.full_request == ('GET / HTTP/1.1\r\n'
'Content-Length: 0\r\n'
'Test-Header: Test Value\r\n'
'Other-Header: Other Value\r\n'
'Host: www.test.com\r\n'
'Test-Header: Test Value2\r\n'
'\r\n')
assert r.host == 'www.test.com'
def test_request_modified_headers():
r = http.Request()
r.status_line = 'GET / HTTP/1.1'
r.headers['content-length'] = '100'
r.headers['cookie'] = 'abc=123'
r.cookies['abc'] = '456'
r.raw_data = 'AAAA'
assert r.full_request == ('GET / HTTP/1.1\r\n'
'content-length: 4\r\n'
'cookie: abc=456\r\n\r\n'
'AAAA')
assert r.headers['content-length'] == '4'
assert r.headers['cookie'] == 'abc=456'
def test_request_update_data():
r = http.Request()
r.status_line = 'GET / HTTP/1.1'
r.headers['content-length'] = 500
r.raw_data = 'AAAA'
assert r.full_request == ('GET / HTTP/1.1\r\n'
'content-length: 4\r\n'
'\r\n'
'AAAA')
def test_request_to_json():
r = http.Request()
r.status_line = 'GET / HTTP/1.1'
r.headers['content-length'] = 500
r.raw_data = 'AAAA'
r.reqid = 1
rsp = http.Response()
rsp.status_line = 'HTTP/1.1 200 OK'
rsp.rspid = 2
r.response = rsp
expected_reqdata = {'full_request': base64.b64encode(r.full_request),
'response_id': rsp.rspid,
#'tag': r.tag,
'reqid': r.reqid,
}
assert json.loads(r.to_json()) == expected_reqdata
def test_request_update_content_length():
r = http.Request(('GET / HTTP/1.1\r\n'
'Content-Length: 4\r\n\r\n'
'AAAAAAAAAA'), update_content_length=True)
assert r.full_request == (('GET / HTTP/1.1\r\n'
'Content-Length: 10\r\n\r\n'
'AAAAAAAAAA'))
def test_request_blank_get_params():
r = http.Request()
r.add_line('GET /this/??-asdf/ HTTP/1.1')
assert r.full_request == ('GET /this/??-asdf/ HTTP/1.1\r\n\r\n')
r = http.Request()
r.add_line('GET /this/??-asdf/?a=b&c&d=ef HTTP/1.1')
assert r.full_request == ('GET /this/??-asdf/?a=b&c&d=ef HTTP/1.1\r\n\r\n')
assert r.get_params['?-asdf/?a'] == 'b'
assert r.get_params['c'] == None
assert r.get_params['d'] == 'ef'
####################
## Response tests
def test_response_simple():
header_lines = [
'HTTP/1.1 200 OK',
'Date: Thu, 22 Oct 2015 00:37:17 GMT',
'Cache-Control: private, max-age=0',
'Content-Type: text/html; charset=UTF-8',
'Server: gws',
'Content-Length: 100',
'',
]
data = 'A'*100
header_len = len('\r\n'.join(header_lines)+'\r\n')
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data)
def test(r):
assert r.complete
assert r.header_len == header_len
assert r.raw_data == data
assert r.response_code == 200
assert r.response_text == 'OK'
assert r.status_line == 'HTTP/1.1 200 OK'
assert r.version == 'HTTP/1.1'
assert r.headers['Date'] == 'Thu, 22 Oct 2015 00:37:17 GMT'
assert r.headers['Cache-Control'] == 'private, max-age=0'
assert r.headers['Content-Type'] == 'text/html; charset=UTF-8'
assert r.headers['Server'] == 'gws'
assert r.headers['Content-Length'] == '100'
assert r.headers['CoNTEnT-leNGTH'] == '100'
test(rf)
test(rl)
test(ru)
test(rj)
def test_response_chunked():
header_lines = [
'HTTP/1.1 200 OK',
'Date: Thu, 22 Oct 2015 00:37:17 GMT',
'Cache-Control: private, max-age=0',
'Content-Type: text/html; charset=UTF-8',
'Server: gws',
'Transfer-Encoding: chunked',
'',
]
data = 'af\r\n'
data += 'A'*0xAF + '\r\n'
data += 'BF\r\n'
data += 'B'*0xBF + '\r\n'
data += '0\r\n\r\n'
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data)
def test(r):
assert r.complete
assert r.raw_data == 'A'*0xAF + 'B'*0xBF
test(rf)
test(rl)
test(ru)
test(rj)
def test_response_gzip():
data_decomp = 'Hello woru!'
data_comp = gzip_string(data_decomp)
header_lines = [
'HTTP/1.1 200 OK',
'Date: Thu, 22 Oct 2015 00:37:17 GMT',
'Cache-Control: private, max-age=0',
'Content-Type: text/html; charset=UTF-8',
'Server: gws',
'Content-Encoding: gzip',
'Content-Length: %d' % len(data_comp),
'',
]
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data_comp)
def test(r):
assert r.complete
assert r.raw_data == data_decomp
test(rf)
test(rl)
test(ru)
test(rj)
def test_response_deflate():
data_decomp = 'Hello woru!'
data_comp = deflate_string(data_decomp)
header_lines = [
'HTTP/1.1 200 OK',
'Date: Thu, 22 Oct 2015 00:37:17 GMT',
'Cache-Control: private, max-age=0',
'Content-Type: text/html; charset=UTF-8',
'Server: gws',
'Content-Encoding: deflate',
'Content-Length: %d' % len(data_comp),
'',
]
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data_comp)
def test(r):
assert r.complete
assert r.raw_data == data_decomp
test(rf)
test(rl)
test(ru)
test(rj)
def test_response_chunked_gzip():
data_decomp = 'Hello world!'
data_comp = gzip_string(data_decomp)
assert len(data_comp) > 3
data_chunked = '3\r\n'
data_chunked += data_comp[:3]
data_chunked += '\r\n%x\r\n' % (len(data_comp[3:]))
data_chunked += data_comp[3:]
data_chunked += '\r\n0\r\n'
header_lines = [
'HTTP/1.1 200 OK',
'Date: Thu, 22 Oct 2015 00:37:17 GMT',
'Cache-Control: private, max-age=0',
'Content-Type: text/html; charset=UTF-8',
'Server: gws',
'Content-Encoding: gzip',
'Transfer-Encoding: chunked',
'',
]
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data_chunked)
def test(r):
assert r.complete
assert r.raw_data == data_decomp
assert r.headers['Content-Length'] == str(len(data_decomp))
assert r.full_response == ('HTTP/1.1 200 OK\r\n'
'Date: Thu, 22 Oct 2015 00:37:17 GMT\r\n'
'Cache-Control: private, max-age=0\r\n'
'Content-Type: text/html; charset=UTF-8\r\n'
'Server: gws\r\n'
'Content-Length: %d\r\n\r\n'
'%s') % (len(data_decomp), data_decomp)
test(rf)
test(rl)
test(ru)
test(rj)
def test_response_early_completion():
r = http.Response()
r.status_line = 'HTTP/1.1 200 OK'
r.add_line('Content-Length: 0')
assert not r.complete
r.add_line('')
assert r.complete
def test_response_cookies():
header_lines = [
'HTTP/1.1 200 OK',
'Content-Length: 0',
'Set-Cookie: ck=1234=567;Expires=Wed, 09 Jun 2021 10:18:14 GMT;secure;httponly;path=/;max-age=12;domain=.foo.bar',
'Set-Cookie: abc=123',
'Set-Cookie: def=456',
'',
]
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines)
def test(r):
assert r.complete
assert r.cookies['ck'].key == 'ck'
assert r.cookies['ck'].val == '1234=567'
assert r.cookies['ck'].domain == '.foo.bar'
assert r.cookies['ck'].expires == 'Wed, 09 Jun 2021 10:18:14 GMT'
assert r.cookies['ck'].http_only
assert r.cookies['ck'].max_age == 12
assert r.cookies['ck'].path == '/'
assert r.cookies['ck'].secure
assert r.cookies['abc'].val == '123'
assert r.cookies['def'].val == '456'
test(rf)
test(rl)
test(ru)
test(rj)
def test_response_repeated_cookies():
r = http.Response(('HTTP/1.1 200 OK\r\n'
'Set-Cookie: foo=bar\r\n'
'Set-Cookie: baz=buzz\r\n'
'Set-Cookie: foo=buzz\r\n'
'\r\n'))
expected_pairs = [('foo', 'bar'), ('baz', 'buzz'), ('foo', 'buzz')]
check_response_cookies(expected_pairs, r)
def test_repeated_response_headers():
# Repeated headers can be used for attacks, so ironically we have to handle
# them well. We always use the last header as the correct one.
header_lines = [
'HTTP/1.1 200 OK',
'Content-Length: 0',
'Test-Head: WRONG',
'Test-Head: RIGHTish',
'',
]
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines)
def test(r):
assert r.complete
assert r.headers['test-head'] == 'RIGHTish'
test(rf)
test(rl)
test(ru)
test(rj)
def test_response_update_statusline():
r = http.Response()
r.status_line = 'HTTP/1.1 200 OK'
assert r.version == 'HTTP/1.1'
assert r.response_code == 200
assert r.response_text == 'OK'
assert not r.complete
assert r.full_response == 'HTTP/1.1 200 OK\r\n\r\n'
def test_response_update_headers():
r = http.Response()
r.status_line = 'HTTP/1.1 200 OK'
r.headers['Test-Header'] = 'Test Value'
r.headers['Other-Header'] = 'Other Value'
assert r.full_response == ('HTTP/1.1 200 OK\r\n'
'Test-Header: Test Value\r\n'
'Other-Header: Other Value\r\n\r\n')
r.headers.append('Test-Header', 'Other Test Value')
assert r.full_response == ('HTTP/1.1 200 OK\r\n'
'Test-Header: Test Value\r\n'
'Other-Header: Other Value\r\n'
'Test-Header: Other Test Value\r\n\r\n')
def test_response_update_modified_headers():
r = http.Response()
r.status_line = 'HTTP/1.1 200 OK'
r.headers['content-length'] = '500'
r.raw_data = 'AAAA'
assert r.full_response == ('HTTP/1.1 200 OK\r\n'
'content-length: 4\r\n\r\n'
'AAAA')
assert r.headers['content-length'] == '4'
def test_response_update_cookies():
r = http.Response()
r.status_line = 'HTTP/1.1 200 OK'
# Test by adding headers
r.headers['Set-Cookie'] = 'abc=123'
assert r.full_response == ('HTTP/1.1 200 OK\r\n'
'Set-Cookie: abc=123\r\n\r\n')
assert r.cookies['abc'].val == '123'
r.headers.append('Set-Cookie', 'abc=456')
assert r.full_response == ('HTTP/1.1 200 OK\r\n'
'Set-Cookie: abc=123\r\n'
'Set-Cookie: abc=456\r\n\r\n'
)
assert r.cookies['abc'].val == '456'
r = http.Response()
r.status_line = 'HTTP/1.1 200 OK'
# Test by adding cookie objects
c = http.ResponseCookie('abc=123; secure')
r.cookies['abc'] = c
assert r.full_response == ('HTTP/1.1 200 OK\r\n'
'Set-Cookie: abc=123; secure\r\n\r\n')
def test_response_update_content_length():
r = http.Response(('HTTP/1.1 200 OK\r\n'
'Content-Length: 4\r\n\r\n'
'AAAAAAAAAA'), update_content_length=True)
assert r.full_response == (('HTTP/1.1 200 OK\r\n'
'Content-Length: 10\r\n\r\n'
'AAAAAAAAAA'))
def test_response_to_json():
rsp = http.Response()
rsp.status_line = 'HTTP/1.1 200 OK'
rsp.rspid = 2
expected_reqdata = {'full_response': base64.b64encode(rsp.full_response),
'rspid': rsp.rspid,
#'tag': r.tag,
}
assert json.loads(rsp.to_json()) == expected_reqdata
def test_response_update_from_objects_cookies():
r = http.Response(('HTTP/1.1 200 OK\r\n'
'Set-Cookie: foo=bar\r\n'
'Set-Cookie: baz=buzz\r\n'
'Header: out of fucking nowhere\r\n'
'Set-Cookie: foo=buzz\r\n'
'\r\n'))
expected_pairs = [('foo', 'bar'), ('baz', 'buzz'), ('foo', 'buzz')]
check_response_cookies(expected_pairs, r)
new_pairs = [('foo', http.ResponseCookie('foo=banana')),
('baz', http.ResponseCookie('baz=buzz')),
('scooby', http.ResponseCookie('scooby=doo')),
('foo', http.ResponseCookie('foo=boo'))]
r.cookies.clear()
r.cookies.add_pairs(new_pairs)
assert r.full_response == ('HTTP/1.1 200 OK\r\n'
'Header: out of fucking nowhere\r\n'
'Set-Cookie: foo=banana\r\n'
'Set-Cookie: baz=buzz\r\n'
'Set-Cookie: scooby=doo\r\n'
'Set-Cookie: foo=boo\r\n'
'\r\n')
expected_pairs = [('foo', 'banana'), ('baz', 'buzz'), ('scooby', 'doo'), ('foo', 'boo')]
check_response_cookies(expected_pairs, r)
def test_response_update_from_objects_cookies_replace():
r = http.Response(('HTTP/1.1 200 OK\r\n'
'Set-Cookie: foo=bar\r\n'
'Set-Cookie: baz=buzz\r\n'
'Header: out of fucking nowhere\r\n'
'Set-Cookie: foo=buzz\r\n'
'\r\n'))
expected_pairs = [('foo', 'bar'), ('baz', 'buzz'), ('foo', 'buzz')]
check_response_cookies(expected_pairs, r)
r.cookies['foo'] = http.ResponseCookie('foo=banana')
assert r.full_response == ('HTTP/1.1 200 OK\r\n'
'Set-Cookie: foo=banana\r\n'
'Set-Cookie: baz=buzz\r\n'
'Header: out of fucking nowhere\r\n'
'\r\n')

View file

@ -0,0 +1,36 @@
import pytest
from proxy import ProxyClient, ProxyClientFactory, ProxyServer
from testutil import mock_deferred
from twisted.internet.protocol import ServerFactory
from twisted.test import proto_helpers
from twisted.internet import defer
####################
## Fixtures
@pytest.fixture
def proxyserver():
factory = ServerFactory()
factory.protocol = ProxyServer
protocol = factory.buildProtocol(('127.0.0.1', 0))
transport = proto_helpers.StringTransport()
protocol.makeConnection(transport)
return (protocol, transport)
####################
## Basic tests
def test_proxy_server_fixture(proxyserver):
prot = proxyserver[0]
tr = proxyserver[1]
prot.transport.write('hello')
print tr.value()
assert tr.value() == 'hello'
@pytest.inlineCallbacks
def test_mock_deferreds(mock_deferred):
d = mock_deferred('Hello!')
r = yield d
assert r == 'Hello!'

View file

@ -0,0 +1,15 @@
import pytest
from twisted.internet import defer
@pytest.fixture
def mock_deferred():
# Generates a function that can be used to make a deferred that can be used
# to mock out deferred-returning responses
def f(value):
def g(data):
return value
d = defer.Deferred()
d.addCallback(g)
d.callback(None)
return d
return f