You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1346 lines
43 KiB
1346 lines
43 KiB
import base64 |
|
import copy |
|
import gzip |
|
import json |
|
import pytest |
|
import StringIO |
|
import zlib |
|
|
|
from pappyproxy.pappy import http |
|
from pappyproxy.util import PappyException |
|
|
|
#################### |
|
# Helper Functions |
|
|
|
class TException(Exception): |
|
pass |
|
|
|
def by_lines_and_full_helper(Type, id_attr, load_func, header_lines, data=''): |
|
# Creates a request/response and returns versions created/recreated in |
|
# different ways. All of them should be equivalent. |
|
# Returned: |
|
# (created with constructor, |
|
# created with add_line and add_data |
|
# after calling update() on it, |
|
# created by serializing and unserializing to json) |
|
|
|
t_lines = Type() |
|
for l in header_lines: |
|
t_lines.add_line(l) |
|
|
|
if data: |
|
t_lines.add_data(data) |
|
|
|
t_fulls = '\r\n'.join(header_lines)+'\r\n' |
|
t_fulls += data |
|
t_full = Type(t_fulls) |
|
t_updated = Type(t_fulls) |
|
|
|
t_json = Type(t_fulls) |
|
t_json.from_json(t_json.to_json()) |
|
|
|
return (t_full, t_lines, t_updated, t_json) |
|
|
|
def req_by_lines_and_full(header_lines, data=''): |
|
# Generates r_full, r_lines using the given header lines and data |
|
# r_lines is created with add_line/add_data and r_full is created with |
|
# the constructor |
|
return by_lines_and_full_helper(http.Request, 'reqid', |
|
http.Request.load_request, |
|
header_lines, data) |
|
|
|
def rsp_by_lines_and_full(header_lines, data=''): |
|
# Generates r_full, r_lines using the given header lines and data |
|
# r_lines is created with add_line/add_data and r_full is created with |
|
# the constructor |
|
return by_lines_and_full_helper(http.Response, 'rspid', |
|
http.Response.load_response, |
|
header_lines, data) |
|
|
|
def gzip_string(string): |
|
out = StringIO.StringIO() |
|
with gzip.GzipFile(fileobj=out, mode="w") as f: |
|
f.write(string) |
|
return out.getvalue() |
|
|
|
def deflate_string(string): |
|
return zlib.compress(string)[2:-4] |
|
|
|
def check_response_cookies(exp_pairs, rsp): |
|
pairs = rsp.cookies.all_pairs() |
|
pairs = [(c.key, c.val) for k, c in pairs] |
|
assert pairs == exp_pairs |
|
|
|
|
|
#################### |
|
# Data storage |
|
|
|
def test_chunked_simple(): |
|
# Test a simple add_data |
|
c = http.ChunkedData() |
|
assert (not c.complete) |
|
|
|
full_data = '5\r\n' |
|
full_data += 'A'*5 |
|
full_data += '\r\n' |
|
full_data += '0\r\n\r\n' |
|
c.add_data(full_data) |
|
assert c.complete |
|
assert c.body == 'A'*5 |
|
|
|
def test_chunked_hex(): |
|
# Test hex lengths |
|
c = http.ChunkedData() |
|
full_data = 'af\r\n' |
|
full_data += 'A'*0xAF |
|
full_data += '\r\n' |
|
full_data += '0\r\n\r\n' |
|
c.add_data(full_data) |
|
assert c.complete |
|
assert c.body == 'A'*0xAF |
|
|
|
c = http.ChunkedData() |
|
full_data = 'AF\r\n' |
|
full_data += 'A'*0xAF |
|
full_data += '\r\n' |
|
full_data += '0\r\n\r\n' |
|
c.add_data(full_data) |
|
assert c.complete |
|
assert c.body == 'A'*0xAF |
|
|
|
c = http.ChunkedData() |
|
full_data = 'aF\r\n' |
|
full_data += 'A'*0xAF |
|
full_data += '\r\n' |
|
full_data += '0\r\n\r\n' |
|
c.add_data(full_data) |
|
assert c.complete |
|
assert c.body == 'A'*0xAF |
|
|
|
def test_chunked_leading_zeros(): |
|
# Test leading zeros |
|
c = http.ChunkedData() |
|
full_data = '000000000000000aF\r\n' |
|
full_data += 'A'*0xAF |
|
full_data += '\r\n' |
|
full_data += '0\r\n\r\n' |
|
c.add_data(full_data) |
|
assert c.complete |
|
assert c.body == 'A'*0xAF |
|
|
|
def test_chunked_one_char_add(): |
|
# Test adding one character at a time |
|
c = http.ChunkedData() |
|
full_data = 'af\r\n' |
|
full_data += 'A'*0xAF |
|
full_data += '\r\n' |
|
full_data += '0\r\n\r\n' |
|
for ch in full_data: |
|
c.add_data(ch) |
|
assert c.complete |
|
assert c.body == 'A'*0xAF |
|
|
|
def test_chunked_incomplete(): |
|
# Tests that complete isn't true until the data is received |
|
full_data = 'af\r\n' |
|
full_data += 'A'*0xAF |
|
full_data += '\r\n' |
|
full_data += '0' # right now we're fine ending on 0 without \r\n |
|
for i in range(len(full_data)-1): |
|
c = http.ChunkedData() |
|
c.add_data(full_data[:i]) |
|
assert not c.complete |
|
|
|
# Test incomplete one character at a time |
|
full_data = 'af\r\n' |
|
full_data += 'A'*0xAF |
|
full_data += '\r\n' |
|
full_data += '0' # right now we're fine ending on 0 without \r\n |
|
for i in range(len(full_data)-1): |
|
c = http.ChunkedData() |
|
for ii in range(i): |
|
c.add_data(full_data[ii]) |
|
assert not c.complete |
|
|
|
def test_length_data_simple(): |
|
# Basic test |
|
l = http.LengthData(100) |
|
assert not l.complete |
|
l.add_data('A'*100) |
|
assert l.complete |
|
assert l.body == 'A'*100 |
|
|
|
l = http.LengthData(0) |
|
assert l.complete |
|
assert l.body == '' |
|
|
|
# Test incomplete |
|
l = http.LengthData(100) |
|
l.add_data('A'*99) |
|
assert not l.complete |
|
|
|
def test_length_one_character(): |
|
# Test adding one character at a time |
|
l = http.LengthData(100) |
|
for i in range(100): |
|
l.add_data('A') |
|
assert l.complete |
|
assert l.body == 'A'*100 |
|
|
|
# Test adding one character at a time (incomplete) |
|
l = http.LengthData(100) |
|
for i in range(99): |
|
l.add_data('A') |
|
assert not l.complete |
|
|
|
def test_length_overflow(): |
|
# Test only saving the given number of chars |
|
l = http.LengthData(100) |
|
l.add_data('A'*400) |
|
assert l.complete |
|
assert l.body == 'A'*100 |
|
|
|
# Test throwing an exception when adding data after complete |
|
l = http.LengthData(100) |
|
l.add_data('A'*100) |
|
with pytest.raises(PappyException): |
|
l.add_data('A') |
|
|
|
def test_repeatable_dict_simple(): |
|
d = http.RepeatableDict() |
|
assert not 'foo' in d |
|
d['foo'] = 'bar' |
|
assert 'foo' in d |
|
d['baz'] = 'fuzz' |
|
d.append('foo', 'fizz') |
|
assert d['foo'] == 'fizz' |
|
assert d['baz'] == 'fuzz' |
|
assert d.all_vals('foo') == ['bar', 'fizz'] |
|
assert d.all_pairs() == [('foo', 'bar'), |
|
('baz', 'fuzz'), |
|
('foo', 'fizz')] |
|
assert not 'fee' in d |
|
d.add_pairs([('fee', 'fi'), |
|
('foo', 'fo')]) |
|
assert 'fee' in d |
|
assert d['fee'] == 'fi' |
|
assert d['baz'] == 'fuzz' |
|
assert d['foo'] == 'fo' |
|
assert d.all_vals('foo') == ['bar', 'fizz', 'fo'] |
|
assert d.all_pairs() == [('foo', 'bar'), |
|
('baz', 'fuzz'), |
|
('foo', 'fizz'), |
|
('fee', 'fi'), |
|
('foo', 'fo')] |
|
|
|
def test_repeatable_dict_constructor(): |
|
d = http.RepeatableDict([('foo','bar'),('baz','fuzz')]) |
|
assert 'foo' in d |
|
assert d['foo'] == 'bar' |
|
assert d['baz'] == 'fuzz' |
|
assert d.all_vals('foo') == ['bar'] |
|
assert d.all_pairs() == [('foo', 'bar'), |
|
('baz', 'fuzz')] |
|
|
|
def test_repeatable_dict_case_insensitive(): |
|
def test(d): |
|
assert 'foo' in d |
|
assert 'fOo' in d |
|
assert d['foo'] == 'fuzz' |
|
assert d['Foo'] == 'fuzz' |
|
assert d['FoO'] == 'fuzz' |
|
|
|
assert d.all_vals('foo') == ['bar', 'fuzz'] |
|
assert d.all_vals('Foo') == ['bar', 'fuzz'] |
|
assert d.all_vals('FoO') == ['bar', 'fuzz'] |
|
|
|
assert d.all_pairs() == [('foo', 'bar'), |
|
('fOo', 'fuzz')] |
|
|
|
d = http.RepeatableDict([('foo','bar'),('fOo','fuzz')], case_insensitive=True) |
|
test(d) |
|
|
|
d = http.RepeatableDict(case_insensitive=True) |
|
d['foo'] = 'bar' |
|
d.append('fOo', 'fuzz') |
|
test(d) |
|
|
|
d = http.RepeatableDict(case_insensitive=True) |
|
d.add_pairs([('foo','bar'),('fOo','fuzz')]) |
|
test(d) |
|
|
|
def test_repeatable_dict_overwrite(): |
|
d = http.RepeatableDict([('foo','bar'),('foo','fuzz'),('bar','baz')]) |
|
d['foo'] = 'asdf' |
|
assert d.all_vals('foo') == ['asdf'] |
|
|
|
def test_repeatable_dict_deletion(): |
|
d = http.RepeatableDict([('foo','bar'),('fOo','fuzz'),('bar','baz')], |
|
case_insensitive=True) |
|
assert 'foo' in d |
|
del d['foo'] |
|
assert not 'foo' in d |
|
|
|
with pytest.raises(KeyError): |
|
x = d['foo'] |
|
|
|
with pytest.raises(KeyError): |
|
x = d['fOo'] |
|
|
|
assert d['bar'] == 'baz' |
|
assert d.all_vals('foo') == [] |
|
|
|
def test_repeatable_dict_callback(): |
|
def f(): |
|
raise TException() |
|
|
|
r = http.RepeatableDict() |
|
r['a'] = 'b' |
|
r.add_pairs([('c', 'd')]) |
|
r.update('a', 'c') |
|
|
|
r.set_modify_callback(f) |
|
with pytest.raises(TException): |
|
r['a'] = 'b' |
|
with pytest.raises(TException): |
|
r.add_pairs([('c', 'd')]) |
|
with pytest.raises(TException): |
|
r.update('a', 'c') |
|
|
|
|
|
#################### |
|
## Cookies |
|
|
|
def test_response_cookie_simple(): |
|
s = 'ck=1234;' |
|
c = http.ResponseCookie(s) |
|
assert c.key == 'ck' |
|
assert c.val == '1234' |
|
assert not c.secure |
|
assert not c.http_only |
|
assert c.domain is None |
|
assert c.expires is None |
|
assert c.max_age is None |
|
assert c.path is None |
|
|
|
def test_response_cookie_params(): |
|
s = 'ck=1234; Expires=Wed, 09 Jun 2021 10:18:14 GMT; secure; httponly; path=/; max-age=12; domain=.foo.bar' |
|
c = http.ResponseCookie(s) |
|
assert c.key == 'ck' |
|
assert c.val == '1234' |
|
assert c.domain == '.foo.bar' |
|
assert c.expires == 'Wed, 09 Jun 2021 10:18:14 GMT' |
|
assert c.http_only |
|
assert c.max_age == 12 |
|
assert c.path == '/' |
|
assert c.secure |
|
|
|
def test_response_cookie_parsing(): |
|
s = 'ck=1234=567;Expires=Wed, 09 Jun 2021 10:18:14 GMT;secure;httponly;path=/;max-age=12;domain=.foo.bar' |
|
c = http.ResponseCookie(s) |
|
assert c.key == 'ck' |
|
assert c.val == '1234=567' |
|
assert c.domain == '.foo.bar' |
|
assert c.expires == 'Wed, 09 Jun 2021 10:18:14 GMT' |
|
assert c.http_only |
|
assert c.max_age == 12 |
|
assert c.path == '/' |
|
assert c.secure |
|
|
|
def test_response_cookie_blank(): |
|
# Don't ask why this exists, I've run into it |
|
s = ' ; path=/; secure' |
|
c = http.ResponseCookie(s) |
|
assert c.key == '' |
|
assert c.val == '' |
|
assert c.path == '/' |
|
assert c.secure |
|
|
|
s = '; path=/; secure' |
|
c = http.ResponseCookie(s) |
|
assert c.key == '' |
|
assert c.val == '' |
|
assert c.path == '/' |
|
assert c.secure |
|
|
|
s = 'asdf; path=/; secure' |
|
c = http.ResponseCookie(s) |
|
assert c.key == 'asdf' |
|
assert c.val == '' |
|
assert c.path == '/' |
|
assert c.secure |
|
|
|
#################### |
|
## HTTPMessage tests |
|
|
|
def test_message_simple(): |
|
raw = ('foobar\r\n' |
|
'a: b\r\n' |
|
'Content-Length: 100\r\n\r\n') |
|
raw += 'A'*100 |
|
m = http.HTTPMessage(raw) |
|
assert m.complete |
|
assert m.malformed == False |
|
assert m.start_line == 'foobar' |
|
assert m.body == 'A'*100 |
|
assert m.headers.all_pairs() == [('a', 'b'), ('Content-Length', '100')] |
|
assert m.headers['A'] == 'b' |
|
assert m.headers_section == ('foobar\r\n' |
|
'a: b\r\n' |
|
'Content-Length: 100\r\n\r\n') |
|
assert m.full_message == raw |
|
|
|
def test_message_build(): |
|
raw = ('foobar\r\n' |
|
'a: b\r\n' |
|
'Content-Length: 100\r\n\r\n') |
|
raw += 'A'*100 |
|
m = http.HTTPMessage() |
|
m.add_line('foobar') |
|
m.add_line('a: b') |
|
m.add_line('Content-Length: 100') |
|
m.add_line('') |
|
assert not m.complete |
|
m.add_data('A'*50) |
|
assert not m.complete |
|
m.add_data('A'*50) |
|
assert m.complete |
|
assert m.malformed == False |
|
assert m.start_line == 'foobar' |
|
assert m.body == 'A'*100 |
|
assert m.headers.all_pairs() == [('a', 'b'), ('Content-Length', '100')] |
|
assert m.headers['A'] == 'b' |
|
assert m.headers_section == ('foobar\r\n' |
|
'a: b\r\n' |
|
'Content-Length: 100\r\n\r\n') |
|
assert m.full_message == raw |
|
|
|
def test_message_build_chunked(): |
|
raw = ('foobar\r\n' |
|
'a: b\r\n' |
|
'Content-Length: 100\r\n\r\n') |
|
raw += 'A'*100 |
|
m = http.HTTPMessage() |
|
m.add_line('foobar') |
|
m.add_line('a: b') |
|
m.add_line('Transfer-Encoding: chunked') |
|
m.add_line('') |
|
assert not m.complete |
|
m.add_data('%x\r\n' % 50) |
|
m.add_data('A'*50) |
|
m.add_data('\r\n') |
|
m.add_data('%x\r\n' % 50) |
|
m.add_data('A'*50) |
|
m.add_data('\r\n') |
|
m.add_data('0\r\n') |
|
assert m.complete |
|
assert m.malformed == False |
|
assert m.start_line == 'foobar' |
|
assert m.body == 'A'*100 |
|
assert m.headers.all_pairs() == [('a', 'b'), ('Content-Length', '100')] |
|
assert m.headers['A'] == 'b' |
|
assert m.headers_section == ('foobar\r\n' |
|
'a: b\r\n' |
|
'Content-Length: 100\r\n\r\n') |
|
assert m.full_message == raw |
|
|
|
def test_message_badheader(): |
|
raw = ('startline\r\n' |
|
'a: b\r\n' |
|
'Content-Encoding\r\n' |
|
'd: e\r\n' |
|
'f:g\r\n' |
|
'\r\n') |
|
m = http.HTTPMessage(raw) |
|
assert m.headers['a'] == 'b' |
|
assert m.headers['content-encoding'] is None |
|
assert m.headers['f'] == 'g' |
|
|
|
#################### |
|
## Request tests |
|
|
|
def test_request_simple(): |
|
header_lines = [ |
|
'GET / HTTP/1.1', |
|
'Content-Type: text/xml; charset="utf-8"', |
|
'Accept-Encoding: gzip,deflate', |
|
'User-Agent: TestAgent', |
|
'Host: www.test.com', |
|
'Content-Length: 100', |
|
'Connection: Keep-Alive', |
|
'Cache-Control: no-cache', |
|
'', |
|
] |
|
headers = '\r\n'.join(header_lines)+'\r\n' |
|
data = 'A'*100 |
|
rf, rl, ru, rj = req_by_lines_and_full(header_lines, data) |
|
def test(r): |
|
assert r.complete |
|
assert r.fragment == None |
|
assert r.full_request == headers+data |
|
assert r.headers_complete |
|
assert r.host == 'www.test.com' |
|
assert r.is_ssl == False |
|
assert r.path == '/' |
|
assert r.port == 80 |
|
assert r.start_line == 'GET / HTTP/1.1' |
|
assert r.verb == 'GET' |
|
assert r.version == 'HTTP/1.1' |
|
assert r.headers['Content-Length'] == '100' |
|
assert r.headers['CoNtent-lENGTH'] == '100' |
|
assert r.headers['Content-Type'] == 'text/xml; charset="utf-8"' |
|
assert r.headers['Accept-Encoding'] == 'gzip,deflate' |
|
assert r.headers['User-Agent'] == 'TestAgent' |
|
assert r.headers['Host'] == 'www.test.com' |
|
assert r.headers['Connection'] == 'Keep-Alive' |
|
assert r.headers['Cache-Control'] == 'no-cache' |
|
assert r.body == 'A'*100 |
|
test(rf) |
|
test(rl) |
|
test(ru) |
|
test(rj) |
|
|
|
def test_request_urlparams(): |
|
header_lines = [ |
|
'GET /?p1=foo&p2=bar#frag HTTP/1.1', |
|
'Content-Length: 0', |
|
'', |
|
] |
|
rf, rl, ru, rj = req_by_lines_and_full(header_lines) |
|
def test(r): |
|
assert r.complete |
|
assert r.fragment == 'frag' |
|
assert r.url_params['p1'] == 'foo' |
|
assert r.url_params['p2'] == 'bar' |
|
assert r.full_request == ('GET /?p1=foo&p2=bar#frag HTTP/1.1\r\n' |
|
'Content-Length: 0\r\n' |
|
'\r\n') |
|
test(rf) |
|
test(rl) |
|
test(ru) |
|
test(rj) |
|
|
|
def test_request_questionmark_url(): |
|
header_lines = [ |
|
'GET /path/??/to/?p1=foo&p2=bar#frag HTTP/1.1', |
|
'Content-Length: 0', |
|
'', |
|
] |
|
rf, rl, ru, rj = req_by_lines_and_full(header_lines) |
|
def test(r): |
|
assert r.complete |
|
assert r.fragment == 'frag' |
|
assert r.url_params['?/to/?p1'] == 'foo' |
|
assert r.url_params['p2'] == 'bar' |
|
assert r.full_request == ('GET /path/??/to/?p1=foo&p2=bar#frag HTTP/1.1\r\n' |
|
'Content-Length: 0\r\n' |
|
'\r\n') |
|
test(rf) |
|
test(rl) |
|
test(ru) |
|
test(rj) |
|
|
|
def test_request_postparams(): |
|
header_lines = [ |
|
'GET / HTTP/1.1', |
|
'Content-Length: 9', |
|
'Content-Type: application/x-www-form-urlencoded', |
|
'', |
|
] |
|
data = 'a=b&c=dee' |
|
rf, rl, ru, rj = req_by_lines_and_full(header_lines, data) |
|
def test(r): |
|
assert r.complete |
|
assert r.post_params['a'] == 'b' |
|
assert r.post_params['c'] == 'dee' |
|
test(rf) |
|
test(rl) |
|
test(ru) |
|
test(rj) |
|
|
|
def test_post_params_update(): |
|
r = http.Request(('GET / HTTP/1.1\r\n' |
|
'Content-Type: application/x-www-form-urlencoded\r\n' |
|
'Content-Length: 7\r\n\r\n' |
|
'a=b&c=d')) |
|
r.post_params['c'] = 'e' |
|
assert r.full_request == ('GET / HTTP/1.1\r\n' |
|
'Content-Type: application/x-www-form-urlencoded\r\n' |
|
'Content-Length: 7\r\n\r\n' |
|
'a=b&c=e') |
|
r.post_params['a'] = 'f' |
|
assert r.full_request == ('GET / HTTP/1.1\r\n' |
|
'Content-Type: application/x-www-form-urlencoded\r\n' |
|
'Content-Length: 7\r\n\r\n' |
|
'a=f&c=e') |
|
|
|
def test_headers_end(): |
|
header_lines = [ |
|
'GET / HTTP/1.1', |
|
'Content-Type: text/xml; charset="utf-8"', |
|
'Accept-Encoding: gzip,deflate', |
|
'User-Agent: TestAgent', |
|
'Host: www.test.com', |
|
'Content-Length: 100', |
|
'Connection: Keep-Alive', |
|
'Cache-Control: no-cache', |
|
'', |
|
] |
|
r = http.Request() |
|
for l in header_lines: |
|
r.add_line(l) |
|
assert not r.complete |
|
assert r.headers_complete |
|
|
|
def test_request_cookies(): |
|
header_lines = [ |
|
'GET /?p1=foo&p2=bar#frag HTTP/1.1', |
|
'Content-Length: 0', |
|
'Cookie: abc=WRONG; def=456; ghi=789; abc=123', |
|
'', |
|
] |
|
rf, rl, ru, rj = req_by_lines_and_full(header_lines) |
|
def test(r): |
|
assert r.complete |
|
assert r.cookies['abc'] == '123' |
|
assert r.cookies['def'] == '456' |
|
assert r.cookies['ghi'] == '789' |
|
assert r.cookies.all_vals('abc') == ['WRONG', '123'] |
|
test(rf) |
|
test(rl) |
|
test(ru) |
|
test(rj) |
|
|
|
def test_request_parse_host(): |
|
header_lines = [ |
|
'GET / HTTP/1.1', |
|
'Content-Length: 0', |
|
'Host: www.test.com:443', |
|
'', |
|
] |
|
rf, rl, ru, rj = req_by_lines_and_full(header_lines) |
|
def test(r): |
|
assert r.complete |
|
assert r.port == 443 |
|
assert r.host == 'www.test.com' |
|
assert r.is_ssl |
|
test(rf) |
|
test(rl) |
|
test(ru) |
|
test(rj) |
|
|
|
def test_request_newline_delim(): |
|
r = http.Request(('GET / HTTP/1.1\n' |
|
'Content-Length: 4\n' |
|
'Test-Header: foo\r\n' |
|
'Other-header: bar\n\r\n' |
|
'AAAA')) |
|
assert r.full_request == ('GET / HTTP/1.1\r\n' |
|
'Content-Length: 4\r\n' |
|
'Test-Header: foo\r\n' |
|
'Other-header: bar\r\n\r\n' |
|
'AAAA') |
|
|
|
def test_repeated_request_headers(): |
|
header_lines = [ |
|
'GET /?p1=foo&p2=bar#frag HTTP/1.1', |
|
'Content-Length: 0', |
|
'Test-Header: WRONG', |
|
'Test-Header: RIGHTiguess', |
|
'', |
|
] |
|
rf, rl, ru, rj = req_by_lines_and_full(header_lines) |
|
def test(r): |
|
assert r.complete |
|
assert r.headers['test-header'] == 'RIGHTiguess' |
|
test(rf) |
|
test(rl) |
|
test(ru) |
|
test(rj) |
|
|
|
def test_request_update_statusline(): |
|
r = http.Request() |
|
r.start_line = 'GET / HTTP/1.1' |
|
assert r.verb == 'GET' |
|
assert r.path == '/' |
|
assert r.version == 'HTTP/1.1' |
|
assert not r.complete |
|
|
|
assert r.full_request == 'GET / HTTP/1.1\r\n\r\n' |
|
|
|
def test_request_update_cookies(): |
|
r = http.Request() |
|
r.start_line = 'GET / HTTP/1.1' |
|
|
|
# Check new cookies |
|
r.cookies['foo'] = 'bar' |
|
r.cookies['baz'] = 'fuzz' |
|
assert r.full_request == ('GET / HTTP/1.1\r\n' |
|
'Cookie: foo=bar; baz=fuzz\r\n' |
|
'\r\n') |
|
|
|
# Check updated cookies (should be updated in place) |
|
r.cookies['foo'] = 'buzz' |
|
assert r.full_request == ('GET / HTTP/1.1\r\n' |
|
'Cookie: foo=buzz; baz=fuzz\r\n' |
|
'\r\n') |
|
|
|
# Check repeated cookies |
|
r.cookies.append('foo', 'bar') |
|
assert r.full_request == ('GET / HTTP/1.1\r\n' |
|
'Cookie: foo=buzz; baz=fuzz; foo=bar\r\n' |
|
'\r\n') |
|
|
|
def test_request_update_headers(): |
|
r = http.Request() |
|
r.start_line = 'GET / HTTP/1.1' |
|
r.headers['Content-Length'] = '0' |
|
r.headers['Test-Header'] = 'Test Value' |
|
r.headers['Other-Header'] = 'Other Value' |
|
r.headers['Host'] = 'www.test.com' |
|
r.headers.append('Test-Header', 'Test Value2') |
|
assert r.full_request == ('GET / HTTP/1.1\r\n' |
|
'Content-Length: 0\r\n' |
|
'Test-Header: Test Value\r\n' |
|
'Other-Header: Other Value\r\n' |
|
'Host: www.test.com\r\n' |
|
'Test-Header: Test Value2\r\n' |
|
'\r\n') |
|
assert r.host == 'www.test.com' |
|
|
|
def test_request_modified_headers(): |
|
r = http.Request() |
|
r.start_line = 'GET / HTTP/1.1' |
|
r.headers['content-length'] = '100' |
|
r.headers['cookie'] = 'abc=123' |
|
r.cookies['abc'] = '456' |
|
r.body = 'AAAA' |
|
assert r.full_request == ('GET / HTTP/1.1\r\n' |
|
'content-length: 4\r\n' |
|
'cookie: abc=456\r\n\r\n' |
|
'AAAA') |
|
assert r.headers['content-length'] == '4' |
|
assert r.headers['cookie'] == 'abc=456' |
|
|
|
def test_request_update_data(): |
|
r = http.Request() |
|
r.start_line = 'GET / HTTP/1.1' |
|
r.headers['content-length'] = 500 |
|
r.body = 'AAAA' |
|
assert r.full_request == ('GET / HTTP/1.1\r\n' |
|
'content-length: 4\r\n' |
|
'\r\n' |
|
'AAAA') |
|
def test_request_to_json(): |
|
r = http.Request() |
|
r.start_line = 'GET / HTTP/1.1' |
|
r.headers['content-length'] = 500 |
|
r.tags = {'foo', 'bar'} |
|
r.body = 'AAAA' |
|
r.reqid = '1' |
|
|
|
rsp = http.Response() |
|
rsp.start_line = 'HTTP/1.1 200 OK' |
|
rsp.rspid = '2' |
|
|
|
r.response = rsp |
|
|
|
expected_reqdata = {'full_message': unicode(base64.b64encode(r.full_request)), |
|
'response_id': str(rsp.rspid), |
|
'port': 80, |
|
'is_ssl': False, |
|
'tags': ['foo', 'bar'], |
|
'reqid': str(r.reqid), |
|
'host': '', |
|
} |
|
|
|
assert json.loads(r.to_json()) == expected_reqdata |
|
|
|
def test_request_update_content_length(): |
|
r = http.Request(('GET / HTTP/1.1\r\n' |
|
'Content-Length: 4\r\n\r\n' |
|
'AAAAAAAAAA'), update_content_length=True) |
|
|
|
assert r.full_request == (('GET / HTTP/1.1\r\n' |
|
'Content-Length: 10\r\n\r\n' |
|
'AAAAAAAAAA')) |
|
|
|
def test_request_blank_url_params(): |
|
r = http.Request() |
|
r.add_line('GET /this/??-asdf/ HTTP/1.1') |
|
assert r.full_request == ('GET /this/??-asdf/ HTTP/1.1\r\n\r\n') |
|
|
|
r = http.Request() |
|
r.add_line('GET /this/??-asdf/?a=b&c&d=ef HTTP/1.1') |
|
assert r.full_request == ('GET /this/??-asdf/?a=b&c&d=ef HTTP/1.1\r\n\r\n') |
|
assert r.url_params['?-asdf/?a'] == 'b' |
|
assert r.url_params['c'] == None |
|
assert r.url_params['d'] == 'ef' |
|
|
|
def test_request_blank(): |
|
r = http.Request('\r\n\n\n') |
|
assert r.full_request == '' |
|
|
|
def test_request_blank_headers(): |
|
r = http.Request(('GET / HTTP/1.1\r\n' |
|
'Header: \r\n' |
|
'Header2:\r\n')) |
|
|
|
assert r.headers['header'] == '' |
|
assert r.headers['header2'] == '' |
|
|
|
def test_request_blank_cookies(): |
|
r = http.Request(('GET / HTTP/1.1\r\n' |
|
'Cookie: \r\n')) |
|
assert r.cookies[''] == '' |
|
|
|
r = http.Request(('GET / HTTP/1.1\r\n' |
|
'Cookie: a=b; ; c=d\r\n')) |
|
assert r.cookies[''] == '' |
|
|
|
r = http.Request(('GET / HTTP/1.1\r\n' |
|
'Cookie: a=b; foo; c=d\r\n')) |
|
assert r.cookies['foo'] == '' |
|
|
|
def test_request_set_url(): |
|
r = http.Request('GET / HTTP/1.1\r\n') |
|
r.url = 'www.AAAA.BBBB' |
|
assert r.host == 'www.AAAA.BBBB' |
|
assert r.port == 80 |
|
assert not r.is_ssl |
|
|
|
r.url = 'https://www.AAAA.BBBB' |
|
assert r.host == 'www.AAAA.BBBB' |
|
assert r.port == 443 |
|
assert r.is_ssl |
|
|
|
r.url = 'https://www.AAAA.BBBB:1234' |
|
assert r.host == 'www.AAAA.BBBB' |
|
assert r.port == 1234 |
|
assert r.is_ssl |
|
|
|
r.url = 'http://www.AAAA.BBBB:443' |
|
assert r.host == 'www.AAAA.BBBB' |
|
assert r.port == 443 |
|
assert not r.is_ssl |
|
|
|
r.url = 'www.AAAA.BBBB:443' |
|
assert r.host == 'www.AAAA.BBBB' |
|
assert r.port == 443 |
|
assert r.is_ssl |
|
|
|
def test_request_set_url_params(): |
|
r = http.Request('GET / HTTP/1.1\r\n') |
|
r.url = 'www.AAAA.BBBB?a=b&c=d#foo' |
|
assert r.url_params.all_pairs() == [('a','b'), ('c','d')] |
|
assert r.fragment == 'foo' |
|
assert r.url == 'http://www.AAAA.BBBB?a=b&c=d#foo' |
|
r.port = 400 |
|
assert r.url == 'http://www.AAAA.BBBB:400?a=b&c=d#foo' |
|
r.is_ssl = True |
|
assert r.url == 'https://www.AAAA.BBBB:400?a=b&c=d#foo' |
|
|
|
def test_request_copy(): |
|
r = http.Request(('GET / HTTP/1.1\r\n' |
|
'Content-Length: 4\r\n\r\n' |
|
'AAAA')) |
|
r2 = copy.copy(r) |
|
assert r2.full_request == ('GET / HTTP/1.1\r\n' |
|
'Content-Length: 4\r\n\r\n' |
|
'AAAA') |
|
|
|
def test_request_url_blankpath(): |
|
r = http.Request() |
|
r.start_line = 'GET / HTTP/1.1' |
|
r.url = 'https://www.google.com' |
|
r.headers['Host'] = r.host |
|
r.url_params.from_dict({'foo': 'bar'}) |
|
assert r.full_path == '/?foo=bar' |
|
assert r.url == 'https://www.google.com?foo=bar' |
|
|
|
def test_request_modify_header2(): |
|
r = http.Request(('POST /some/path HTTP/1.1\r\n' |
|
'Host: test.host.thing\r\n' |
|
'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.10; rv:43.0) Gecko/20100101 Firefox/43.0\r\n' |
|
'Content-Length: 282\r\n' |
|
'Connection: keep-alive\r\n' |
|
'\r\n' |
|
'a|b|c|d')) |
|
r2 = r.copy() |
|
r2.headers['User-Agent'] = 'Moziller/6.9' |
|
assert r2.full_message == ('POST /some/path HTTP/1.1\r\n' |
|
'Host: test.host.thing\r\n' |
|
'User-Agent: Moziller/6.9\r\n' |
|
'Content-Length: 7\r\n' |
|
'Connection: keep-alive\r\n' |
|
'\r\n' |
|
'a|b|c|d') |
|
r2.post_params['foo'] = 'barr' |
|
assert r2.full_message == ('POST /some/path HTTP/1.1\r\n' |
|
'Host: test.host.thing\r\n' |
|
'User-Agent: Moziller/6.9\r\n' |
|
'Content-Length: 8\r\n' |
|
'Connection: keep-alive\r\n' |
|
'Content-Type: application/x-www-form-urlencoded\r\n' |
|
'\r\n' |
|
'foo=barr') |
|
|
|
def test_request_absolute_url(): |
|
r = http.Request(('GET /foo/path HTTP/1.1\r\n' |
|
'Host: www.example.faketld\r\n\r\n')) |
|
assert r.full_message == ('GET /foo/path HTTP/1.1\r\n' |
|
'Host: www.example.faketld\r\n\r\n') |
|
r.path_type = http.PATH_ABSOLUTE |
|
assert r.full_message == ('GET http://www.example.faketld/foo/path HTTP/1.1\r\n' |
|
'Host: www.example.faketld\r\n\r\n') |
|
r.is_ssl = True |
|
assert r.full_message == ('GET https://www.example.faketld/foo/path HTTP/1.1\r\n' |
|
'Host: www.example.faketld\r\n\r\n') |
|
|
|
def test_proxy_auth(): |
|
r = http.Request(('GET /foo/path HTTP/1.1\r\n' |
|
'Host: www.example.faketld\r\n\r\n')) |
|
r.proxy_creds = ('username', 'pass:word') |
|
assert r.full_message == ('GET /foo/path HTTP/1.1\r\n' |
|
'Host: www.example.faketld\r\n' |
|
'Proxy-Authorization: Basic dXNlcm5hbWU6cGFzczp3b3Jk\r\n\r\n') |
|
|
|
def test_request_connect_request(): |
|
r = http.Request(('GET /foo/path HTTP/1.1\r\n' |
|
'Host: www.example.faketld\r\n\r\n')) |
|
assert r.connect_request == None |
|
r.is_ssl = True |
|
assert r.connect_request.full_message == ('CONNECT www.example.faketld:443 HTTP/1.1\r\n' |
|
'Host: www.example.faketld\r\n\r\n') |
|
|
|
def test_request_connect_request_creds(): |
|
r = http.Request(('GET /foo/path HTTP/1.1\r\n' |
|
'Host: www.example.faketld\r\n\r\n')) |
|
r.is_ssl = True |
|
r.proxy_creds = ('username', 'pass:word') |
|
assert r.connect_request.full_message == ('CONNECT www.example.faketld:443 HTTP/1.1\r\n' |
|
'Host: www.example.faketld\r\n' |
|
'Proxy-Authorization: Basic dXNlcm5hbWU6cGFzczp3b3Jk\r\n\r\n') |
|
|
|
#################### |
|
## Response tests |
|
|
|
def test_response_simple(): |
|
header_lines = [ |
|
'HTTP/1.1 200 OK', |
|
'Date: Thu, 22 Oct 2015 00:37:17 GMT', |
|
'Cache-Control: private, max-age=0', |
|
'Content-Type: text/html; charset=UTF-8', |
|
'Server: gws', |
|
'Content-Length: 100', |
|
'', |
|
] |
|
data = 'A'*100 |
|
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data) |
|
def test(r): |
|
assert r.complete |
|
assert r.body == data |
|
assert r.response_code == 200 |
|
assert r.response_text == 'OK' |
|
assert r.start_line == 'HTTP/1.1 200 OK' |
|
assert r.version == 'HTTP/1.1' |
|
|
|
assert r.headers['Date'] == 'Thu, 22 Oct 2015 00:37:17 GMT' |
|
assert r.headers['Cache-Control'] == 'private, max-age=0' |
|
assert r.headers['Content-Type'] == 'text/html; charset=UTF-8' |
|
assert r.headers['Server'] == 'gws' |
|
assert r.headers['Content-Length'] == '100' |
|
assert r.headers['CoNTEnT-leNGTH'] == '100' |
|
|
|
test(rf) |
|
test(rl) |
|
test(ru) |
|
test(rj) |
|
|
|
def test_response_chunked(): |
|
header_lines = [ |
|
'HTTP/1.1 200 OK', |
|
'Date: Thu, 22 Oct 2015 00:37:17 GMT', |
|
'Cache-Control: private, max-age=0', |
|
'Content-Type: text/html; charset=UTF-8', |
|
'Server: gws', |
|
'Transfer-Encoding: chunked', |
|
'', |
|
] |
|
data = 'af\r\n' |
|
data += 'A'*0xAF + '\r\n' |
|
data += 'BF\r\n' |
|
data += 'B'*0xBF + '\r\n' |
|
data += '0\r\n\r\n' |
|
|
|
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data) |
|
def test(r): |
|
assert r.complete |
|
assert r.body == 'A'*0xAF + 'B'*0xBF |
|
|
|
test(rf) |
|
test(rl) |
|
test(ru) |
|
test(rj) |
|
|
|
def test_response_gzip(): |
|
data_decomp = 'Hello woru!' |
|
data_comp = gzip_string(data_decomp) |
|
|
|
header_lines = [ |
|
'HTTP/1.1 200 OK', |
|
'Date: Thu, 22 Oct 2015 00:37:17 GMT', |
|
'Cache-Control: private, max-age=0', |
|
'Content-Type: text/html; charset=UTF-8', |
|
'Server: gws', |
|
'Content-Encoding: gzip', |
|
'Content-Length: %d' % len(data_comp), |
|
'', |
|
] |
|
|
|
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data_comp) |
|
def test(r): |
|
assert r.complete |
|
assert r.body == data_decomp |
|
|
|
test(rf) |
|
test(rl) |
|
test(ru) |
|
test(rj) |
|
|
|
def test_response_deflate(): |
|
data_decomp = 'Hello woru!' |
|
data_comp = deflate_string(data_decomp) |
|
|
|
header_lines = [ |
|
'HTTP/1.1 200 OK', |
|
'Date: Thu, 22 Oct 2015 00:37:17 GMT', |
|
'Cache-Control: private, max-age=0', |
|
'Content-Type: text/html; charset=UTF-8', |
|
'Server: gws', |
|
'Content-Encoding: deflate', |
|
'Content-Length: %d' % len(data_comp), |
|
'', |
|
] |
|
|
|
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data_comp) |
|
def test(r): |
|
assert r.complete |
|
assert r.body == data_decomp |
|
|
|
test(rf) |
|
test(rl) |
|
test(ru) |
|
test(rj) |
|
|
|
def test_response_chunked_gzip(): |
|
data_decomp = 'Hello world!' |
|
data_comp = gzip_string(data_decomp) |
|
assert len(data_comp) > 3 |
|
data_chunked = '3\r\n' |
|
data_chunked += data_comp[:3] |
|
data_chunked += '\r\n%x\r\n' % (len(data_comp[3:])) |
|
data_chunked += data_comp[3:] |
|
data_chunked += '\r\n0\r\n' |
|
|
|
header_lines = [ |
|
'HTTP/1.1 200 OK', |
|
'Date: Thu, 22 Oct 2015 00:37:17 GMT', |
|
'Cache-Control: private, max-age=0', |
|
'Content-Type: text/html; charset=UTF-8', |
|
'Server: gws', |
|
'Content-Encoding: gzip', |
|
'Transfer-Encoding: chunked', |
|
'', |
|
] |
|
|
|
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines, data_chunked) |
|
def test(r): |
|
assert r.complete |
|
assert r.body == data_decomp |
|
assert r.headers['Content-Length'] == str(len(data_decomp)) |
|
assert r.full_response == ('HTTP/1.1 200 OK\r\n' |
|
'Date: Thu, 22 Oct 2015 00:37:17 GMT\r\n' |
|
'Cache-Control: private, max-age=0\r\n' |
|
'Content-Type: text/html; charset=UTF-8\r\n' |
|
'Server: gws\r\n' |
|
'Content-Length: %d\r\n\r\n' |
|
'%s') % (len(data_decomp), data_decomp) |
|
|
|
test(rf) |
|
test(rl) |
|
test(ru) |
|
test(rj) |
|
|
|
def test_response_early_completion(): |
|
r = http.Response() |
|
r.start_line = 'HTTP/1.1 200 OK' |
|
r.add_line('Content-Length: 0') |
|
assert not r.complete |
|
r.add_line('') |
|
assert r.complete |
|
|
|
def test_response_cookies(): |
|
header_lines = [ |
|
'HTTP/1.1 200 OK', |
|
'Content-Length: 0', |
|
'Set-Cookie: ck=1234=567;Expires=Wed, 09 Jun 2021 10:18:14 GMT;secure;httponly;path=/;max-age=12;domain=.foo.bar', |
|
'Set-Cookie: abc=123', |
|
'Set-Cookie: def=456', |
|
'', |
|
] |
|
|
|
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines) |
|
def test(r): |
|
assert r.complete |
|
assert r.cookies['ck'].key == 'ck' |
|
assert r.cookies['ck'].val == '1234=567' |
|
assert r.cookies['ck'].domain == '.foo.bar' |
|
assert r.cookies['ck'].expires == 'Wed, 09 Jun 2021 10:18:14 GMT' |
|
assert r.cookies['ck'].http_only |
|
assert r.cookies['ck'].max_age == 12 |
|
assert r.cookies['ck'].path == '/' |
|
assert r.cookies['ck'].secure |
|
|
|
assert r.cookies['abc'].val == '123' |
|
assert r.cookies['def'].val == '456' |
|
|
|
test(rf) |
|
test(rl) |
|
test(ru) |
|
test(rj) |
|
|
|
def test_response_repeated_cookies(): |
|
r = http.Response(('HTTP/1.1 200 OK\r\n' |
|
'Set-Cookie: foo=bar\r\n' |
|
'Set-Cookie: baz=buzz\r\n' |
|
'Set-Cookie: foo=buzz\r\n' |
|
'\r\n')) |
|
expected_pairs = [('foo', 'bar'), ('baz', 'buzz'), ('foo', 'buzz')] |
|
check_response_cookies(expected_pairs, r) |
|
|
|
def test_repeated_response_headers(): |
|
# Repeated headers can be used for attacks, so ironically we have to handle |
|
# them well. We always use the last header as the correct one. |
|
header_lines = [ |
|
'HTTP/1.1 200 OK', |
|
'Content-Length: 0', |
|
'Test-Head: WRONG', |
|
'Test-Head: RIGHTish', |
|
'', |
|
] |
|
|
|
rf, rl, ru, rj = rsp_by_lines_and_full(header_lines) |
|
def test(r): |
|
assert r.complete |
|
assert r.headers['test-head'] == 'RIGHTish' |
|
|
|
test(rf) |
|
test(rl) |
|
test(ru) |
|
test(rj) |
|
|
|
def test_response_update_statusline(): |
|
r = http.Response() |
|
r.start_line = 'HTTP/1.1 200 OK' |
|
assert r.version == 'HTTP/1.1' |
|
assert r.response_code == 200 |
|
assert r.response_text == 'OK' |
|
assert not r.complete |
|
|
|
assert r.full_response == 'HTTP/1.1 200 OK\r\n\r\n' |
|
|
|
def test_response_update_headers(): |
|
r = http.Response() |
|
r.start_line = 'HTTP/1.1 200 OK' |
|
r.headers['Test-Header'] = 'Test Value' |
|
r.headers['Other-Header'] = 'Other Value' |
|
|
|
assert r.full_response == ('HTTP/1.1 200 OK\r\n' |
|
'Test-Header: Test Value\r\n' |
|
'Other-Header: Other Value\r\n\r\n') |
|
|
|
r.headers.append('Test-Header', 'Other Test Value') |
|
assert r.full_response == ('HTTP/1.1 200 OK\r\n' |
|
'Test-Header: Test Value\r\n' |
|
'Other-Header: Other Value\r\n' |
|
'Test-Header: Other Test Value\r\n\r\n') |
|
|
|
def test_response_update_modified_headers(): |
|
r = http.Response() |
|
r.start_line = 'HTTP/1.1 200 OK' |
|
r.headers['content-length'] = '500' |
|
r.body = 'AAAA' |
|
assert r.full_response == ('HTTP/1.1 200 OK\r\n' |
|
'content-length: 4\r\n\r\n' |
|
'AAAA') |
|
assert r.headers['content-length'] == '4' |
|
|
|
def test_response_update_cookies(): |
|
r = http.Response() |
|
r.start_line = 'HTTP/1.1 200 OK' |
|
# Test by adding headers |
|
r.headers['Set-Cookie'] = 'abc=123' |
|
assert r.full_response == ('HTTP/1.1 200 OK\r\n' |
|
'Set-Cookie: abc=123\r\n\r\n') |
|
assert r.cookies['abc'].val == '123' |
|
r.headers.append('Set-Cookie', 'abc=456') |
|
assert r.full_response == ('HTTP/1.1 200 OK\r\n' |
|
'Set-Cookie: abc=123\r\n' |
|
'Set-Cookie: abc=456\r\n\r\n' |
|
) |
|
assert r.cookies['abc'].val == '456' |
|
|
|
r = http.Response() |
|
r.start_line = 'HTTP/1.1 200 OK' |
|
# Test by adding cookie objects |
|
c = http.ResponseCookie('abc=123; secure') |
|
r.cookies['abc'] = c |
|
assert r.full_response == ('HTTP/1.1 200 OK\r\n' |
|
'Set-Cookie: abc=123; secure\r\n\r\n') |
|
|
|
def test_response_update_content_length(): |
|
r = http.Response(('HTTP/1.1 200 OK\r\n' |
|
'Content-Length: 4\r\n\r\n' |
|
'AAAAAAAAAA'), update_content_length=True) |
|
|
|
assert r.full_response == (('HTTP/1.1 200 OK\r\n' |
|
'Content-Length: 10\r\n\r\n' |
|
'AAAAAAAAAA')) |
|
|
|
def test_response_to_json(): |
|
rsp = http.Response() |
|
rsp.start_line = 'HTTP/1.1 200 OK' |
|
rsp.rspid = 2 |
|
|
|
expected_reqdata = {'full_message': base64.b64encode(rsp.full_response), |
|
'rspid': rsp.rspid, |
|
#'tag': r.tag, |
|
} |
|
|
|
assert json.loads(rsp.to_json()) == expected_reqdata |
|
|
|
def test_response_update_from_objects_cookies(): |
|
r = http.Response(('HTTP/1.1 200 OK\r\n' |
|
'Set-Cookie: foo=bar\r\n' |
|
'Set-Cookie: baz=buzz\r\n' |
|
'Header: out of fucking nowhere\r\n' |
|
'Set-Cookie: foo=buzz\r\n' |
|
'\r\n')) |
|
expected_pairs = [('foo', 'bar'), ('baz', 'buzz'), ('foo', 'buzz')] |
|
check_response_cookies(expected_pairs, r) |
|
|
|
new_pairs = [('foo', http.ResponseCookie('foo=banana')), |
|
('baz', http.ResponseCookie('baz=buzz')), |
|
('scooby', http.ResponseCookie('scooby=doo')), |
|
('foo', http.ResponseCookie('foo=boo'))] |
|
r.cookies.clear() |
|
r.cookies.add_pairs(new_pairs) |
|
|
|
assert r.full_response == ('HTTP/1.1 200 OK\r\n' |
|
'Header: out of fucking nowhere\r\n' |
|
'Set-Cookie: foo=banana\r\n' |
|
'Set-Cookie: baz=buzz\r\n' |
|
'Set-Cookie: scooby=doo\r\n' |
|
'Set-Cookie: foo=boo\r\n' |
|
'\r\n') |
|
expected_pairs = [('foo', 'banana'), ('baz', 'buzz'), ('scooby', 'doo'), ('foo', 'boo')] |
|
check_response_cookies(expected_pairs, r) |
|
|
|
def test_response_update_from_objects_cookies_replace(): |
|
r = http.Response(('HTTP/1.1 200 OK\r\n' |
|
'Set-Cookie: foo=bar\r\n' |
|
'Set-Cookie: baz=buzz\r\n' |
|
'Header: out of fucking nowhere\r\n' |
|
'Set-Cookie: foo=buzz\r\n' |
|
'\r\n')) |
|
expected_pairs = [('foo', 'bar'), ('baz', 'buzz'), ('foo', 'buzz')] |
|
check_response_cookies(expected_pairs, r) |
|
|
|
|
|
r.cookies['foo'] = http.ResponseCookie('foo=banana') |
|
|
|
assert r.full_response == ('HTTP/1.1 200 OK\r\n' |
|
'Set-Cookie: foo=banana\r\n' |
|
'Set-Cookie: baz=buzz\r\n' |
|
'Header: out of fucking nowhere\r\n' |
|
'\r\n') |
|
|
|
def test_response_blank(): |
|
r = http.Response('\r\n\n\n') |
|
assert r.full_response == '' |
|
|
|
def test_response_blank_headers(): |
|
r = http.Response(('HTTP/1.1 200 OK\r\n' |
|
'Header: \r\n' |
|
'Header2:\r\n')) |
|
|
|
assert r.headers['header'] == '' |
|
assert r.headers['header2'] == '' |
|
|
|
def test_response_newlines(): |
|
r = http.Response(('HTTP/1.1 200 OK\n' |
|
'Content-Length: 4\n\r\n' |
|
'AAAA')) |
|
assert r.full_response == ('HTTP/1.1 200 OK\r\n' |
|
'Content-Length: 4\r\n\r\n' |
|
'AAAA') |
|
|
|
def test_copy_response(): |
|
r = http.Response(('HTTP/1.1 200 OK\r\n' |
|
'Content-Length: 4\r\n\r\n' |
|
'AAAA')) |
|
assert r.full_response == ('HTTP/1.1 200 OK\r\n' |
|
'Content-Length: 4\r\n\r\n' |
|
'AAAA') |
|
|
|
r2 = copy.copy(r) |
|
assert r.full_response == ('HTTP/1.1 200 OK\r\n' |
|
'Content-Length: 4\r\n\r\n' |
|
'AAAA') |
|
|
|
def test_response_add_cookie(): |
|
r = http.Response(('HTTP/1.1 200 OK\r\n' |
|
'Content-Length: 0\r\n' |
|
'Set-Cookie: foo=bar\r\n\r\n')) |
|
r.add_cookie(http.ResponseCookie('foo=baz')) |
|
assert r.full_response == ('HTTP/1.1 200 OK\r\n' |
|
'Content-Length: 0\r\n' |
|
'Set-Cookie: foo=bar\r\n' |
|
'Set-Cookie: foo=baz\r\n\r\n') |
|
|
|
def test_response_set_cookie(): |
|
r = http.Response(('HTTP/1.1 200 OK\r\n' |
|
'Content-Length: 0\r\n')) |
|
r.set_cookie(http.ResponseCookie('foo=bar')) |
|
assert r.full_response == ('HTTP/1.1 200 OK\r\n' |
|
'Content-Length: 0\r\n' |
|
'Set-Cookie: foo=bar\r\n\r\n') |
|
|
|
r.set_cookie(http.ResponseCookie('foo=baz')) |
|
assert r.full_response == ('HTTP/1.1 200 OK\r\n' |
|
'Content-Length: 0\r\n' |
|
'Set-Cookie: foo=baz\r\n\r\n') |
|
|
|
def test_response_delete_cookie(): |
|
r = http.Response(('HTTP/1.1 200 OK\r\n' |
|
'Content-Length: 0\r\n' |
|
'Set-Cookie: foo=bar\r\n\r\n')) |
|
r.delete_cookie('foo') |
|
assert r.full_response == ('HTTP/1.1 200 OK\r\n' |
|
'Content-Length: 0\r\n\r\n') |
|
|
|
r = http.Response(('HTTP/1.1 200 OK\r\n' |
|
'Content-Length: 0\r\n' |
|
'Set-Cookie: foo=bar\r\n' |
|
'Set-Cookie: foo=baz\r\n\r\n')) |
|
r.delete_cookie('foo') |
|
assert r.full_response == ('HTTP/1.1 200 OK\r\n' |
|
'Content-Length: 0\r\n\r\n') |
|
|
|
def test_response_short_statusline(): |
|
r = http.Response('HTTP/1.1 407\r\n\r\n') |
|
assert r.status_line == 'HTTP/1.1 407' |
|
assert r.response_text == '' |
|
assert r.version == 'HTTP/1.1' |
|
assert r.response_code == 407
|
|
|