import base64 import copy import gzip import json import pytest import StringIO import zlib from pappyproxy.pappy import http from pappyproxy.util import PappyException #################### # Helper Functions class TException(Exception): pass def by_lines_and_full_helper(Type, id_attr, load_func, header_lines, data=''): # Creates a request/response and returns versions created/recreated in # different ways. All of them should be equivalent. # Returned: # (created with constructor, # created with add_line and add_data # after calling update() on it, # created by serializing and unserializing to json) t_lines = Type() for l in header_lines: t_lines.add_line(l) if data: t_lines.add_data(data) t_fulls = '\r\n'.join(header_lines)+'\r\n' t_fulls += data t_full = Type(t_fulls) t_updated = Type(t_fulls) t_json = Type(t_fulls) t_json.from_json(t_json.to_json()) return (t_full, t_lines, t_updated, t_json) def req_by_lines_and_full(header_lines, data=''): # Generates r_full, r_lines using the given header lines and data # r_lines is created with add_line/add_data and r_full is created with # the constructor return by_lines_and_full_helper(http.Request, 'reqid', http.Request.load_request, header_lines, data) def rsp_by_lines_and_full(header_lines, data=''): # Generates r_full, r_lines using the given header lines and data # r_lines is created with add_line/add_data and r_full is created with # the constructor return by_lines_and_full_helper(http.Response, 'rspid', http.Response.load_response, header_lines, data) def gzip_string(string): out = StringIO.StringIO() with gzip.GzipFile(fileobj=out, mode="w") as f: f.write(string) return out.getvalue() def deflate_string(string): return zlib.compress(string)[2:-4] def check_response_cookies(exp_pairs, rsp): pairs = rsp.cookies.all_pairs() pairs = [(c.key, c.val) for k, c in pairs] assert pairs == exp_pairs #################### # Data storage def test_chunked_simple(): # Test a simple add_data c = http.ChunkedData() assert (not c.complete) full_data = '5\r\n' full_data += 'A'*5 full_data += '\r\n' full_data += '0\r\n\r\n' c.add_data(full_data) assert c.complete assert c.body == 'A'*5 def test_chunked_hex(): # Test hex lengths c = http.ChunkedData() full_data = 'af\r\n' full_data += 'A'*0xAF full_data += '\r\n' full_data += '0\r\n\r\n' c.add_data(full_data) assert c.complete assert c.body == 'A'*0xAF c = http.ChunkedData() full_data = 'AF\r\n' full_data += 'A'*0xAF full_data += '\r\n' full_data += '0\r\n\r\n' c.add_data(full_data) assert c.complete assert c.body == 'A'*0xAF c = http.ChunkedData() full_data = 'aF\r\n' full_data += 'A'*0xAF full_data += '\r\n' full_data += '0\r\n\r\n' c.add_data(full_data) assert c.complete assert c.body == 'A'*0xAF def test_chunked_leading_zeros(): # Test leading zeros c = http.ChunkedData() full_data = '000000000000000aF\r\n' full_data += 'A'*0xAF full_data += '\r\n' full_data += '0\r\n\r\n' c.add_data(full_data) assert c.complete assert c.body == 'A'*0xAF def test_chunked_one_char_add(): # Test adding one character at a time c = http.ChunkedData() full_data = 'af\r\n' full_data += 'A'*0xAF full_data += '\r\n' full_data += '0\r\n\r\n' for ch in full_data: c.add_data(ch) assert c.complete assert c.body == 'A'*0xAF def test_chunked_incomplete(): # Tests that complete isn't true until the data is received full_data = 'af\r\n' full_data += 'A'*0xAF full_data += '\r\n' full_data += '0' # right now we're fine ending on 0 without \r\n for i in range(len(full_data)-1): c = http.ChunkedData() c.add_data(full_data[:i]) assert not c.complete # Test incomplete one character at a time full_data = 'af\r\n' full_data += 'A'*0xAF full_data += '\r\n' full_data += '0' # right now we're fine ending on 0 without \r\n for i in range(len(full_data)-1): c = http.ChunkedData() for ii in range(i): c.add_data(full_data[ii]) assert not c.complete def test_length_data_simple(): # Basic test l = http.LengthData(100) assert not l.complete l.add_data('A'*100) assert l.complete assert l.body == 'A'*100 l = http.LengthData(0) assert l.complete assert l.body == '' # Test incomplete l = http.LengthData(100) l.add_data('A'*99) assert not l.complete def test_length_one_character(): # Test adding one character at a time l = http.LengthData(100) for i in range(100): l.add_data('A') assert l.complete assert l.body == 'A'*100 # Test adding one character at a time (incomplete) l = http.LengthData(100) for i in range(99): l.add_data('A') assert not l.complete def test_length_overflow(): # Test only saving the given number of chars l = http.LengthData(100) l.add_data('A'*400) assert l.complete assert l.body == 'A'*100 # Test throwing an exception when adding data after complete l = http.LengthData(100) l.add_data('A'*100) with pytest.raises(PappyException): l.add_data('A') def test_repeatable_dict_simple(): d = http.RepeatableDict() assert not 'foo' in d d['foo'] = 'bar' assert 'foo' in d d['baz'] = 'fuzz' d.append('foo', 'fizz') assert d['foo'] == 'fizz' assert d['baz'] == 'fuzz' assert d.all_vals('foo') == ['bar', 'fizz'] assert d.all_pairs() == [('foo', 'bar'), ('baz', 'fuzz'), ('foo', 'fizz')] assert not 'fee' in d d.add_pairs([('fee', 'fi'), ('foo', 'fo')]) assert 'fee' in d assert d['fee'] == 'fi' assert d['baz'] == 'fuzz' assert d['foo'] == 'fo' assert d.all_vals('foo') == ['bar', 'fizz', 'fo'] assert d.all_pairs() == [('foo', 'bar'), ('baz', 'fuzz'), ('foo', 'fizz'), ('fee', 'fi'), ('foo', 'fo')] def test_repeatable_dict_constructor(): d = http.RepeatableDict([('foo','bar'),('baz','fuzz')]) assert 'foo' in d assert d['foo'] == 'bar' assert d['baz'] == 'fuzz' assert d.all_vals('foo') == ['bar'] assert d.all_pairs() == [('foo', 'bar'), ('baz', 'fuzz')] def test_repeatable_dict_case_insensitive(): def test(d): assert 'foo' in d assert 'fOo' in d assert d['foo'] == 'fuzz' assert d['Foo'] == 'fuzz' assert d['FoO'] == 'fuzz' assert d.all_vals('foo') == ['bar', 'fuzz'] assert d.all_vals('Foo') == ['bar', 'fuzz'] assert d.all_vals('FoO') == ['bar', 'fuzz'] assert d.all_pairs() == [('foo', 'bar'), ('fOo', 'fuzz')] d = http.RepeatableDict([('foo','bar'),('fOo','fuzz')], case_insensitive=True) test(d) d = http.RepeatableDict(case_insensitive=True) d['foo'] = 'bar' d.append('fOo', 'fuzz') test(d) d = http.RepeatableDict(case_insensitive=True) d.add_pairs([('foo','bar'),('fOo','fuzz')]) test(d) def test_repeatable_dict_overwrite(): d = http.RepeatableDict([('foo','bar'),('foo','fuzz'),('bar','baz')]) d['foo'] = 'asdf' assert d.all_vals('foo') == ['asdf'] def test_repeatable_dict_deletion(): d = http.RepeatableDict([('foo','bar'),('fOo','fuzz'),('bar','baz')], case_insensitive=True) assert 'foo' in d del d['foo'] assert not 'foo' in d with pytest.raises(KeyError): x = d['foo'] with pytest.raises(KeyError): x = d['fOo'] assert d['bar'] == 'baz' assert d.all_vals('foo') == [] def test_repeatable_dict_callback(): def f(): raise TException() r = http.RepeatableDict() r['a'] = 'b' r.add_pairs([('c', 'd')]) r.update('a', 'c') r.set_modify_callback(f) with pytest.raises(TException): r['a'] = 'b' with pytest.raises(TException): r.add_pairs([('c', 'd')]) with pytest.raises(TException): r.update('a', 'c') #################### ## Cookies def test_response_cookie_simple(): s = 'ck=1234;' c = http.ResponseCookie(s) assert c.key == 'ck' assert c.val == '1234' assert not c.secure assert not c.http_only assert c.domain is None assert c.expires is None assert c.max_age is None assert c.path is None def test_response_cookie_params(): s = 'ck=1234; Expires=Wed, 09 Jun 2021 10:18:14 GMT; secure; httponly; path=/; max-age=12; domain=.foo.bar' c = http.ResponseCookie(s) assert c.key == 'ck' assert c.val == '1234' assert c.domain == '.foo.bar' assert c.expires == 'Wed, 09 Jun 2021 10:18:14 GMT' assert c.http_only assert c.max_age == 12 assert c.path == '/' assert c.secure def test_response_cookie_parsing(): s = 'ck=1234=567;Expires=Wed, 09 Jun 2021 10:18:14 GMT;secure;httponly;path=/;max-age=12;domain=.foo.bar' c = http.ResponseCookie(s) assert c.key == 'ck' assert c.val == '1234=567' assert c.domain == '.foo.bar' assert c.expires == 'Wed, 09 Jun 2021 10:18:14 GMT' assert c.http_only assert c.max_age == 12 assert c.path == '/' assert c.secure def test_response_cookie_blank(): # Don't ask why this exists, I've run into it s = ' ; path=/; secure' c = http.ResponseCookie(s) assert c.key == '' assert c.val == '' assert c.path == '/' assert c.secure s = '; path=/; secure' c = http.ResponseCookie(s) assert c.key == '' assert c.val == '' assert c.path == '/' assert c.secure s = 'asdf; path=/; secure' c = http.ResponseCookie(s) assert c.key == 'asdf' assert c.val == '' assert c.path == '/' assert c.secure #################### ## HTTPMessage tests def test_message_simple(): raw = ('foobar\r\n' 'a: b\r\n' 'Content-Length: 100\r\n\r\n') raw += 'A'*100 m = http.HTTPMessage(raw) assert m.complete assert m.malformed == False assert m.start_line == 'foobar' assert m.body == 'A'*100 assert m.headers.all_pairs() == [('a', 'b'), ('Content-Length', '100')] assert m.headers['A'] == 'b' assert m.headers_section == ('foobar\r\n' 'a: b\r\n' 'Content-Length: 100\r\n\r\n') assert m.full_message == raw def test_message_build(): raw = ('foobar\r\n' 'a: b\r\n' 'Content-Length: 100\r\n\r\n') raw += 'A'*100 m = http.HTTPMessage() m.add_line('foobar') m.add_line('a: b') m.add_line('Content-Length: 100') m.add_line('') assert not m.complete m.add_data('A'*50) assert not m.complete m.add_data('A'*50) assert m.complete assert m.malformed == False assert m.start_line == 'foobar' assert m.body == 'A'*100 assert m.headers.all_pairs() == [('a', 'b'), ('Content-Length', '100')] assert m.headers['A'] == 'b' assert m.headers_section == ('foobar\r\n' 'a: b\r\n' 'Content-Length: 100\r\n\r\n') assert m.full_message == raw def test_message_build_chunked(): raw = ('foobar\r\n' 'a: b\r\n' 'Content-Length: 100\r\n\r\n') raw += 'A'*100 m = http.HTTPMessage() m.add_line('foobar') m.add_line('a: b') m.add_line('Transfer-Encoding: chunked') m.add_line('') assert not m.complete m.add_data('%x\r\n' % 50) m.add_data('A'*50) m.add_data('\r\n') m.add_data('%x\r\n' % 50) m.add_data('A'*50) m.add_data('\r\n') m.add_data('0\r\n') assert m.complete assert m.malformed == False assert m.start_line == 'foobar' assert m.body == 'A'*100 assert m.headers.all_pairs() == [('a', 'b'), ('Content-Length', '100')] assert m.headers['A'] == 'b' assert m.headers_section == ('foobar\r\n' 'a: b\r\n' 'Content-Length: 100\r\n\r\n') assert m.full_message == raw def test_message_badheader(): raw = ('startline\r\n' 'a: b\r\n' 'Content-Encoding\r\n' 'd: e\r\n' 'f:g\r\n' '\r\n') m = http.HTTPMessage(raw) assert m.headers['a'] == 'b' assert m.headers['content-encoding'] is None assert m.headers['f'] == 'g' #################### ## Request tests def test_request_simple(): header_lines = [ 'GET / HTTP/1.1', 'Content-Type: text/xml; charset="utf-8"', 'Accept-Encoding: gzip,deflate', 'User-Agent: TestAgent', 'Host: www.test.com', 'Content-Length: 100', 'Connection: Keep-Alive', 'Cache-Control: no-cache', '', ] headers = '\r\n'.join(header_lines)+'\r\n' data = 'A'*100 rf, rl, ru, rj = req_by_lines_and_full(header_lines, data) def test(r): assert r.complete assert r.fragment == None assert r.full_request == headers+data assert r.headers_complete assert r.host == 'www.test.com' assert r.is_ssl == False assert r.path == '/' assert r.port == 80 assert r.start_line == 'GET / HTTP/1.1' assert r.verb == 'GET' assert r.version == 'HTTP/1.1' assert r.headers['Content-Length'] == '100' assert r.headers['CoNtent-lENGTH'] == '100' assert r.headers['Content-Type'] == 'text/xml; charset="utf-8"' assert r.headers['Accept-Encoding'] == 'gzip,deflate' assert r.headers['User-Agent'] == 'TestAgent' assert r.headers['Host'] == 'www.test.com' assert r.headers['Connection'] == 'Keep-Alive' assert r.headers['Cache-Control'] == 'no-cache' assert r.body == 'A'*100 test(rf) test(rl) test(ru) test(rj) def test_request_urlparams(): header_lines = [ 'GET /?p1=foo&p2=bar#frag HTTP/1.1', 'Content-Length: 0', '', ] rf, rl, ru, rj = req_by_lines_and_full(header_lines) def test(r): assert r.complete assert r.fragment == 'frag' assert r.url_params['p1'] == 'foo' assert r.url_params['p2'] == 'bar' assert r.full_request == ('GET /?p1=foo&p2=bar#frag HTTP/1.1\r\n' 'Content-Length: 0\r\n' '\r\n') test(rf) test(rl) test(ru) test(rj) def test_request_questionmark_url(): header_lines = [ 'GET /path/??/to/?p1=foo&p2=bar#frag HTTP/1.1', 'Content-Length: 0', '', ] rf, rl, ru, rj = req_by_lines_and_full(header_lines) def test(r): assert r.complete assert r.fragment == 'frag' assert r.url_params['?/to/?p1'] == 'foo' assert r.url_params['p2'] == 'bar' assert r.full_request == ('GET /path/??/to/?p1=foo&p2=bar#frag HTTP/1.1\r\n' 'Content-Length: 0\r\n' '\r\n') test(rf) test(rl) test(ru) test(rj) def test_request_postparams(): header_lines = [ 'GET / HTTP/1.1', 'Content-Length: 9', 'Content-Type: application/x-www-form-urlencoded', '', ] data = 'a=b&c=dee' rf, rl, ru, rj = req_by_lines_and_full(header_lines, data) def test(r): assert r.complete assert r.post_params['a'] == 'b' assert r.post_params['c'] == 'dee' test(rf) test(rl) test(ru) test(rj) def test_post_params_update(): r = http.Request(('GET / HTTP/1.1\r\n' 'Content-Type: application/x-www-form-urlencoded\r\n' 'Content-Length: 7\r\n\r\n' 'a=b&c=d')) r.post_params['c'] = 'e' assert r.full_request == ('GET / HTTP/1.1\r\n' 'Content-Type: application/x-www-form-urlencoded\r\n' 'Content-Length: 7\r\n\r\n' 'a=b&c=e') r.post_params['a'] = 'f' assert r.full_request == ('GET / HTTP/1.1\r\n' 'Content-Type: application/x-www-form-urlencoded\r\n' 'Content-Length: 7\r\n\r\n' 'a=f&c=e') def test_headers_end(): header_lines = [ 'GET / HTTP/1.1', 'Content-Type: text/xml; charset="utf-8"', 'Accept-Encoding: gzip,deflate', 'User-Agent: TestAgent', 'Host: www.test.com', 'Content-Length: 100', 'Connection: Keep-Alive', 'Cache-Control: no-cache', '', ] r = http.Request() for l in header_lines: r.add_line(l) assert not r.complete assert r.headers_complete def test_request_cookies(): header_lines = [ 'GET /?p1=foo&p2=bar#frag HTTP/1.1', 'Content-Length: 0', 'Cookie: abc=WRONG; def=456; ghi=789; abc=123', '', ] rf, rl, ru, rj = req_by_lines_and_full(header_lines) def test(r): assert r.complete assert r.cookies['abc'] == '123' assert r.cookies['def'] == '456' assert r.cookies['ghi'] == '789' assert r.cookies.all_vals('abc') == ['WRONG', '123'] test(rf) test(rl) test(ru) test(rj) def test_request_parse_host(): header_lines = [ 'GET / HTTP/1.1', 'Content-Length: 0', 'Host: www.test.com:443', '', ] rf, rl, ru, rj = req_by_lines_and_full(header_lines) def test(r): assert r.complete assert r.port == 443 assert r.host == 'www.test.com' assert r.is_ssl test(rf) test(rl) test(ru) test(rj) def test_request_newline_delim(): r = http.Request(('GET / HTTP/1.1\n' 'Content-Length: 4\n' 'Test-Header: foo\r\n' 'Other-header: bar\n\r\n' 'AAAA')) assert r.full_request == ('GET / HTTP/1.1\r\n' 'Content-Length: 4\r\n' 'Test-Header: foo\r\n' 'Other-header: bar\r\n\r\n' 'AAAA') def test_repeated_request_headers(): header_lines = [ 'GET /?p1=foo&p2=bar#frag HTTP/1.1', 'Content-Length: 0', 'Test-Header: WRONG', 'Test-Header: RIGHTiguess', '', ] rf, rl, ru, rj = req_by_lines_and_full(header_lines) def test(r): assert r.complete assert r.headers['test-header'] == 'RIGHTiguess' test(rf) test(rl) test(ru) test(rj) def test_request_update_statusline(): r = http.Request() r.start_line = 'GET / HTTP/1.1' assert r.verb == 'GET' assert r.path == '/' assert r.version == 'HTTP/1.1' assert not r.complete assert r.full_request == 'GET / HTTP/1.1\r\n\r\n' def test_request_update_cookies(): r = http.Request() r.start_line = 'GET / HTTP/1.1' # Check new cookies r.cookies['foo'] = 'bar' r.cookies['baz'] = 'fuzz' assert r.full_request == ('GET / HTTP/1.1\r\n' 'Cookie: foo=bar; baz=fuzz\r\n' '\r\n') # Check updated cookies (should be updated in place) r.cookies['foo'] = 'buzz' assert r.full_request == ('GET / HTTP/1.1\r\n' 'Cookie: foo=buzz; baz=fuzz\r\n' '\r\n') # Check repeated cookies r.cookies.append('foo', 'bar') assert r.full_request == ('GET / HTTP/1.1\r\n' 'Cookie: foo=buzz; baz=fuzz; foo=bar\r\n' '\r\n') def test_request_update_headers(): r = http.Request() r.start_line = 'GET / HTTP/1.1' r.headers['Content-Length'] = '0' r.headers['Test-Header'] = 'Test Value' r.headers['Other-Header'] = 'Other Value' r.headers['Host'] = 'www.test.com' r.headers.append('Test-Header', 'Test Value2') assert r.full_request == ('GET / HTTP/1.1\r\n' 'Content-Length: 0\r\n' 'Test-Header: Test Value\r\n' 'Other-Header: Other Value\r\n' 'Host: www.test.com\r\n' 'Test-Header: Test Value2\r\n' '\r\n') assert r.host == 'www.test.com' def test_request_modified_headers(): r = http.Request() r.start_line = 'GET / HTTP/1.1' r.headers['content-length'] = '100' r.headers['cookie'] = 'abc=123' r.cookies['abc'] = '456' r.body = 'AAAA' assert r.full_request == ('GET / HTTP/1.1\r\n' 'content-length: 4\r\n' 'cookie: abc=456\r\n\r\n' 'AAAA') assert r.headers['content-length'] == '4' assert r.headers['cookie'] == 'abc=456' def test_request_update_data(): r = http.Request() r.start_line = 'GET / HTTP/1.1' r.headers['content-length'] = 500 r.body = 'AAAA' assert r.full_request == ('GET / HTTP/1.1\r\n' 'content-length: 4\r\n' '\r\n' 'AAAA') def test_request_to_json(): r = http.Request() r.start_line = 'GET / HTTP/1.1' r.headers['content-length'] = 500 r.tags = {'foo', 'bar'} r.body = 'AAAA' r.reqid = '1' rsp = http.Response() rsp.start_line = 'HTTP/1.1 200 OK' rsp.rspid = '2' r.response = rsp expected_reqdata = {'full_message': unicode(base64.b64encode(r.full_request)), 'response_id': str(rsp.rspid), 'port': 80, 'is_ssl': False, 'tags': ['foo', 'bar'], 'reqid': str(r.reqid), 'host': '', } assert json.loads(r.to_json()) == expected_reqdata def test_request_update_content_length(): r = http.Request(('GET / HTTP/1.1\r\n' 'Content-Length: 4\r\n\r\n' 'AAAAAAAAAA'), update_content_length=True) assert r.full_request == (('GET / HTTP/1.1\r\n' 'Content-Length: 10\r\n\r\n' 'AAAAAAAAAA')) def test_request_blank_url_params(): r = http.Request() r.add_line('GET /this/??-asdf/ HTTP/1.1') assert r.full_request == ('GET /this/??-asdf/ HTTP/1.1\r\n\r\n') r = http.Request() r.add_line('GET /this/??-asdf/?a=b&c&d=ef HTTP/1.1') assert r.full_request == ('GET /this/??-asdf/?a=b&c&d=ef HTTP/1.1\r\n\r\n') assert r.url_params['?-asdf/?a'] == 'b' assert r.url_params['c'] == None assert r.url_params['d'] == 'ef' def test_request_blank(): r = http.Request('\r\n\n\n') assert r.full_request == '' def test_request_blank_headers(): r = http.Request(('GET / HTTP/1.1\r\n' 'Header: \r\n' 'Header2:\r\n')) assert r.headers['header'] == '' assert r.headers['header2'] == '' def test_request_blank_cookies(): r = http.Request(('GET / HTTP/1.1\r\n' 'Cookie: \r\n')) assert r.cookies[''] == '' r = http.Request(('GET / HTTP/1.1\r\n' 'Cookie: a=b; ; c=d\r\n')) assert r.cookies[''] == '' r = http.Request(('GET / HTTP/1.1\r\n' 'Cookie: a=b; foo; c=d\r\n')) assert r.cookies['foo'] == '' def test_request_set_url(): r = http.Request('GET / HTTP/1.1\r\n') r.url = 'www.AAAA.BBBB' assert r.host == 'www.AAAA.BBBB' assert r.port == 80 assert not r.is_ssl r.url = 'https://www.AAAA.BBBB' assert r.host == 'www.AAAA.BBBB' assert r.port == 443 assert r.is_ssl r.url = 'https://www.AAAA.BBBB:1234' assert r.host == 'www.AAAA.BBBB' assert r.port == 1234 assert r.is_ssl r.url = 'http://www.AAAA.BBBB:443' assert r.host == 'www.AAAA.BBBB' assert r.port == 443 assert not r.is_ssl r.url = 'www.AAAA.BBBB:443' assert r.host == 'www.AAAA.BBBB' assert r.port == 443 assert r.is_ssl def test_request_set_url_params(): r = http.Request('GET / HTTP/1.1\r\n') r.url = 'www.AAAA.BBBB?a=b&c=d#foo' assert r.url_params.all_pairs() == [('a','b'), ('c','d')] assert r.fragment == 'foo' assert r.url == 'http://www.AAAA.BBBB?a=b&c=d#foo' r.port = 400 assert r.url == 'http://www.AAAA.BBBB:400?a=b&c=d#foo' r.is_ssl = True assert r.url == 'https://www.AAAA.BBBB:400?a=b&c=d#foo' def test_request_copy(): r = http.Request(('GET / HTTP/1.1\r\n' 'Content-Length: 4\r\n\r\n' 'AAAA')) r2 = copy.copy(r) assert r2.full_request == ('GET / HTTP/1.1\r\n' 'Content-Length: 4\r\n\r\n' 'AAAA') def test_request_url_blankpath(): r = http.Request() r.start_line = 'GET / HTTP/1.1' r.url = 'https://www.google.com' r.headers['Host'] = r.host r.url_params.from_dict({'foo': 'bar'}) assert r.full_path == '/?foo=bar' assert r.url == 'https://www.google.com?foo=bar' def test_request_modify_header2(): r = http.Request(('POST /some/path HTTP/1.1\r\n' 'Host: test.host.thing\r\n' 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.10; rv:43.0) Gecko/20100101 Firefox/43.0\r\n' 'Content-Length: 282\r\n' 'Connection: keep-alive\r\n' '\r\n' 'a|b|c|d')) r2 = r.copy() r2.headers['User-Agent'] = 'Moziller/6.9' assert r2.full_message == ('POST /some/path HTTP/1.1\r\n' 'Host: test.host.thing\r\n' 'User-Agent: Moziller/6.9\r\n' 'Content-Length: 7\r\n' 'Connection: keep-alive\r\n' '\r\n' 'a|b|c|d') r2.post_params['foo'] = 'barr' assert r2.full_message == ('POST /some/path HTTP/1.1\r\n' 'Host: test.host.thing\r\n' 'User-Agent: Moziller/6.9\r\n' 'Content-Length: 8\r\n' 'Connection: keep-alive\r\n' 'Content-Type: application/x-www-form-urlencoded\r\n' '\r\n' 'foo=barr') def test_request_absolute_url(): r = http.Request(('GET /foo/path HTTP/1.1\r\n' 'Host: www.example.faketld\r\n\r\n')) assert r.full_message == ('GET /foo/path HTTP/1.1\r\n' 'Host: www.example.faketld\r\n\r\n') r.path_type = http.PATH_ABSOLUTE assert r.full_message == ('GET http://www.example.faketld/foo/path HTTP/1.1\r\n' 'Host: www.example.faketld\r\n\r\n') r.is_ssl = True assert r.full_message == ('GET https://www.example.faketld/foo/path HTTP/1.1\r\n' 'Host: www.example.faketld\r\n\r\n') def test_proxy_auth(): r = http.Request(('GET /foo/path HTTP/1.1\r\n' 'Host: www.example.faketld\r\n\r\n')) r.proxy_creds = ('username', 'pass:word') assert r.full_message == ('GET /foo/path HTTP/1.1\r\n' 'Host: www.example.faketld\r\n' 'Proxy-Authorization: Basic dXNlcm5hbWU6cGFzczp3b3Jk\r\n\r\n') def test_request_connect_request(): r = http.Request(('GET /foo/path HTTP/1.1\r\n' 'Host: www.example.faketld\r\n\r\n')) assert r.connect_request == None r.is_ssl = True assert r.connect_request.full_message == ('CONNECT www.example.faketld:443 HTTP/1.1\r\n' 'Host: www.example.faketld\r\n\r\n') def test_request_connect_request_creds(): r = http.Request(('GET /foo/path HTTP/1.1\r\n' 'Host: www.example.faketld\r\n\r\n')) r.is_ssl = True r.proxy_creds = ('username', 'pass:word') assert r.connect_request.full_message == ('CONNECT www.example.faketld:443 HTTP/1.1\r\n' 'Host: www.example.faketld\r\n' 'Proxy-Authorization: Basic dXNlcm5hbWU6cGFzczp3b3Jk\r\n\r\n') #################### ## Response tests def test_response_simple(): header_lines = [ 'HTTP/1.1 200 OK', 'Date: Thu, 22 Oct 2015 00:37:17 GMT', 'Cache-Control: private, max-age=0', 'Content-Type: text/html; charset=UTF-8', 'Server: gws', 'Content-Length: 100', '', ] data = 'A'*100 rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data) def test(r): assert r.complete assert r.body == data assert r.response_code == 200 assert r.response_text == 'OK' assert r.start_line == 'HTTP/1.1 200 OK' assert r.version == 'HTTP/1.1' assert r.headers['Date'] == 'Thu, 22 Oct 2015 00:37:17 GMT' assert r.headers['Cache-Control'] == 'private, max-age=0' assert r.headers['Content-Type'] == 'text/html; charset=UTF-8' assert r.headers['Server'] == 'gws' assert r.headers['Content-Length'] == '100' assert r.headers['CoNTEnT-leNGTH'] == '100' test(rf) test(rl) test(ru) test(rj) def test_response_chunked(): header_lines = [ 'HTTP/1.1 200 OK', 'Date: Thu, 22 Oct 2015 00:37:17 GMT', 'Cache-Control: private, max-age=0', 'Content-Type: text/html; charset=UTF-8', 'Server: gws', 'Transfer-Encoding: chunked', '', ] data = 'af\r\n' data += 'A'*0xAF + '\r\n' data += 'BF\r\n' data += 'B'*0xBF + '\r\n' data += '0\r\n\r\n' rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data) def test(r): assert r.complete assert r.body == 'A'*0xAF + 'B'*0xBF test(rf) test(rl) test(ru) test(rj) def test_response_gzip(): data_decomp = 'Hello woru!' data_comp = gzip_string(data_decomp) header_lines = [ 'HTTP/1.1 200 OK', 'Date: Thu, 22 Oct 2015 00:37:17 GMT', 'Cache-Control: private, max-age=0', 'Content-Type: text/html; charset=UTF-8', 'Server: gws', 'Content-Encoding: gzip', 'Content-Length: %d' % len(data_comp), '', ] rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data_comp) def test(r): assert r.complete assert r.body == data_decomp test(rf) test(rl) test(ru) test(rj) def test_response_deflate(): data_decomp = 'Hello woru!' data_comp = deflate_string(data_decomp) header_lines = [ 'HTTP/1.1 200 OK', 'Date: Thu, 22 Oct 2015 00:37:17 GMT', 'Cache-Control: private, max-age=0', 'Content-Type: text/html; charset=UTF-8', 'Server: gws', 'Content-Encoding: deflate', 'Content-Length: %d' % len(data_comp), '', ] rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data_comp) def test(r): assert r.complete assert r.body == data_decomp test(rf) test(rl) test(ru) test(rj) def test_response_chunked_gzip(): data_decomp = 'Hello world!' data_comp = gzip_string(data_decomp) assert len(data_comp) > 3 data_chunked = '3\r\n' data_chunked += data_comp[:3] data_chunked += '\r\n%x\r\n' % (len(data_comp[3:])) data_chunked += data_comp[3:] data_chunked += '\r\n0\r\n' header_lines = [ 'HTTP/1.1 200 OK', 'Date: Thu, 22 Oct 2015 00:37:17 GMT', 'Cache-Control: private, max-age=0', 'Content-Type: text/html; charset=UTF-8', 'Server: gws', 'Content-Encoding: gzip', 'Transfer-Encoding: chunked', '', ] rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data_chunked) def test(r): assert r.complete assert r.body == data_decomp assert r.headers['Content-Length'] == str(len(data_decomp)) assert r.full_response == ('HTTP/1.1 200 OK\r\n' 'Date: Thu, 22 Oct 2015 00:37:17 GMT\r\n' 'Cache-Control: private, max-age=0\r\n' 'Content-Type: text/html; charset=UTF-8\r\n' 'Server: gws\r\n' 'Content-Length: %d\r\n\r\n' '%s') % (len(data_decomp), data_decomp) test(rf) test(rl) test(ru) test(rj) def test_response_early_completion(): r = http.Response() r.start_line = 'HTTP/1.1 200 OK' r.add_line('Content-Length: 0') assert not r.complete r.add_line('') assert r.complete def test_response_cookies(): header_lines = [ 'HTTP/1.1 200 OK', 'Content-Length: 0', 'Set-Cookie: ck=1234=567;Expires=Wed, 09 Jun 2021 10:18:14 GMT;secure;httponly;path=/;max-age=12;domain=.foo.bar', 'Set-Cookie: abc=123', 'Set-Cookie: def=456', '', ] rf, rl, ru, rj = rsp_by_lines_and_full(header_lines) def test(r): assert r.complete assert r.cookies['ck'].key == 'ck' assert r.cookies['ck'].val == '1234=567' assert r.cookies['ck'].domain == '.foo.bar' assert r.cookies['ck'].expires == 'Wed, 09 Jun 2021 10:18:14 GMT' assert r.cookies['ck'].http_only assert r.cookies['ck'].max_age == 12 assert r.cookies['ck'].path == '/' assert r.cookies['ck'].secure assert r.cookies['abc'].val == '123' assert r.cookies['def'].val == '456' test(rf) test(rl) test(ru) test(rj) def test_response_repeated_cookies(): r = http.Response(('HTTP/1.1 200 OK\r\n' 'Set-Cookie: foo=bar\r\n' 'Set-Cookie: baz=buzz\r\n' 'Set-Cookie: foo=buzz\r\n' '\r\n')) expected_pairs = [('foo', 'bar'), ('baz', 'buzz'), ('foo', 'buzz')] check_response_cookies(expected_pairs, r) def test_repeated_response_headers(): # Repeated headers can be used for attacks, so ironically we have to handle # them well. We always use the last header as the correct one. header_lines = [ 'HTTP/1.1 200 OK', 'Content-Length: 0', 'Test-Head: WRONG', 'Test-Head: RIGHTish', '', ] rf, rl, ru, rj = rsp_by_lines_and_full(header_lines) def test(r): assert r.complete assert r.headers['test-head'] == 'RIGHTish' test(rf) test(rl) test(ru) test(rj) def test_response_update_statusline(): r = http.Response() r.start_line = 'HTTP/1.1 200 OK' assert r.version == 'HTTP/1.1' assert r.response_code == 200 assert r.response_text == 'OK' assert not r.complete assert r.full_response == 'HTTP/1.1 200 OK\r\n\r\n' def test_response_update_headers(): r = http.Response() r.start_line = 'HTTP/1.1 200 OK' r.headers['Test-Header'] = 'Test Value' r.headers['Other-Header'] = 'Other Value' assert r.full_response == ('HTTP/1.1 200 OK\r\n' 'Test-Header: Test Value\r\n' 'Other-Header: Other Value\r\n\r\n') r.headers.append('Test-Header', 'Other Test Value') assert r.full_response == ('HTTP/1.1 200 OK\r\n' 'Test-Header: Test Value\r\n' 'Other-Header: Other Value\r\n' 'Test-Header: Other Test Value\r\n\r\n') def test_response_update_modified_headers(): r = http.Response() r.start_line = 'HTTP/1.1 200 OK' r.headers['content-length'] = '500' r.body = 'AAAA' assert r.full_response == ('HTTP/1.1 200 OK\r\n' 'content-length: 4\r\n\r\n' 'AAAA') assert r.headers['content-length'] == '4' def test_response_update_cookies(): r = http.Response() r.start_line = 'HTTP/1.1 200 OK' # Test by adding headers r.headers['Set-Cookie'] = 'abc=123' assert r.full_response == ('HTTP/1.1 200 OK\r\n' 'Set-Cookie: abc=123\r\n\r\n') assert r.cookies['abc'].val == '123' r.headers.append('Set-Cookie', 'abc=456') assert r.full_response == ('HTTP/1.1 200 OK\r\n' 'Set-Cookie: abc=123\r\n' 'Set-Cookie: abc=456\r\n\r\n' ) assert r.cookies['abc'].val == '456' r = http.Response() r.start_line = 'HTTP/1.1 200 OK' # Test by adding cookie objects c = http.ResponseCookie('abc=123; secure') r.cookies['abc'] = c assert r.full_response == ('HTTP/1.1 200 OK\r\n' 'Set-Cookie: abc=123; secure\r\n\r\n') def test_response_update_content_length(): r = http.Response(('HTTP/1.1 200 OK\r\n' 'Content-Length: 4\r\n\r\n' 'AAAAAAAAAA'), update_content_length=True) assert r.full_response == (('HTTP/1.1 200 OK\r\n' 'Content-Length: 10\r\n\r\n' 'AAAAAAAAAA')) def test_response_to_json(): rsp = http.Response() rsp.start_line = 'HTTP/1.1 200 OK' rsp.rspid = 2 expected_reqdata = {'full_message': base64.b64encode(rsp.full_response), 'rspid': rsp.rspid, #'tag': r.tag, } assert json.loads(rsp.to_json()) == expected_reqdata def test_response_update_from_objects_cookies(): r = http.Response(('HTTP/1.1 200 OK\r\n' 'Set-Cookie: foo=bar\r\n' 'Set-Cookie: baz=buzz\r\n' 'Header: out of fucking nowhere\r\n' 'Set-Cookie: foo=buzz\r\n' '\r\n')) expected_pairs = [('foo', 'bar'), ('baz', 'buzz'), ('foo', 'buzz')] check_response_cookies(expected_pairs, r) new_pairs = [('foo', http.ResponseCookie('foo=banana')), ('baz', http.ResponseCookie('baz=buzz')), ('scooby', http.ResponseCookie('scooby=doo')), ('foo', http.ResponseCookie('foo=boo'))] r.cookies.clear() r.cookies.add_pairs(new_pairs) assert r.full_response == ('HTTP/1.1 200 OK\r\n' 'Header: out of fucking nowhere\r\n' 'Set-Cookie: foo=banana\r\n' 'Set-Cookie: baz=buzz\r\n' 'Set-Cookie: scooby=doo\r\n' 'Set-Cookie: foo=boo\r\n' '\r\n') expected_pairs = [('foo', 'banana'), ('baz', 'buzz'), ('scooby', 'doo'), ('foo', 'boo')] check_response_cookies(expected_pairs, r) def test_response_update_from_objects_cookies_replace(): r = http.Response(('HTTP/1.1 200 OK\r\n' 'Set-Cookie: foo=bar\r\n' 'Set-Cookie: baz=buzz\r\n' 'Header: out of fucking nowhere\r\n' 'Set-Cookie: foo=buzz\r\n' '\r\n')) expected_pairs = [('foo', 'bar'), ('baz', 'buzz'), ('foo', 'buzz')] check_response_cookies(expected_pairs, r) r.cookies['foo'] = http.ResponseCookie('foo=banana') assert r.full_response == ('HTTP/1.1 200 OK\r\n' 'Set-Cookie: foo=banana\r\n' 'Set-Cookie: baz=buzz\r\n' 'Header: out of fucking nowhere\r\n' '\r\n') def test_response_blank(): r = http.Response('\r\n\n\n') assert r.full_response == '' def test_response_blank_headers(): r = http.Response(('HTTP/1.1 200 OK\r\n' 'Header: \r\n' 'Header2:\r\n')) assert r.headers['header'] == '' assert r.headers['header2'] == '' def test_response_newlines(): r = http.Response(('HTTP/1.1 200 OK\n' 'Content-Length: 4\n\r\n' 'AAAA')) assert r.full_response == ('HTTP/1.1 200 OK\r\n' 'Content-Length: 4\r\n\r\n' 'AAAA') def test_copy_response(): r = http.Response(('HTTP/1.1 200 OK\r\n' 'Content-Length: 4\r\n\r\n' 'AAAA')) assert r.full_response == ('HTTP/1.1 200 OK\r\n' 'Content-Length: 4\r\n\r\n' 'AAAA') r2 = copy.copy(r) assert r.full_response == ('HTTP/1.1 200 OK\r\n' 'Content-Length: 4\r\n\r\n' 'AAAA') def test_response_add_cookie(): r = http.Response(('HTTP/1.1 200 OK\r\n' 'Content-Length: 0\r\n' 'Set-Cookie: foo=bar\r\n\r\n')) r.add_cookie(http.ResponseCookie('foo=baz')) assert r.full_response == ('HTTP/1.1 200 OK\r\n' 'Content-Length: 0\r\n' 'Set-Cookie: foo=bar\r\n' 'Set-Cookie: foo=baz\r\n\r\n') def test_response_set_cookie(): r = http.Response(('HTTP/1.1 200 OK\r\n' 'Content-Length: 0\r\n')) r.set_cookie(http.ResponseCookie('foo=bar')) assert r.full_response == ('HTTP/1.1 200 OK\r\n' 'Content-Length: 0\r\n' 'Set-Cookie: foo=bar\r\n\r\n') r.set_cookie(http.ResponseCookie('foo=baz')) assert r.full_response == ('HTTP/1.1 200 OK\r\n' 'Content-Length: 0\r\n' 'Set-Cookie: foo=baz\r\n\r\n') def test_response_delete_cookie(): r = http.Response(('HTTP/1.1 200 OK\r\n' 'Content-Length: 0\r\n' 'Set-Cookie: foo=bar\r\n\r\n')) r.delete_cookie('foo') assert r.full_response == ('HTTP/1.1 200 OK\r\n' 'Content-Length: 0\r\n\r\n') r = http.Response(('HTTP/1.1 200 OK\r\n' 'Content-Length: 0\r\n' 'Set-Cookie: foo=bar\r\n' 'Set-Cookie: foo=baz\r\n\r\n')) r.delete_cookie('foo') assert r.full_response == ('HTTP/1.1 200 OK\r\n' 'Content-Length: 0\r\n\r\n') def test_response_short_statusline(): r = http.Response('HTTP/1.1 407\r\n\r\n') assert r.status_line == 'HTTP/1.1 407' assert r.response_text == '' assert r.version == 'HTTP/1.1' assert r.response_code == 407