You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
340 lines
10 KiB
340 lines
10 KiB
import re |
|
import sys |
|
import string |
|
import time |
|
import datetime |
|
import base64 |
|
from pygments.formatters import TerminalFormatter |
|
from pygments.lexers import get_lexer_for_mimetype, HttpLexer |
|
from pygments import highlight |
|
from io import StringIO |
|
from .colors import Colors, Styles, verb_color, scode_color, path_formatter, color_string |
|
|
|
|
|
def str_hash_code(s): |
|
h = 0 |
|
n = len(s)-1 |
|
for c in s.encode(): |
|
h += c*31**n |
|
n -= 1 |
|
return h |
|
|
|
def printable_data(data, colors=True): |
|
""" |
|
Return ``data``, but replaces unprintable characters with periods. |
|
|
|
:param data: The data to make printable |
|
:type data: String |
|
:rtype: String |
|
""" |
|
chars = [] |
|
colored = False |
|
for c in data: |
|
if chr(c) in string.printable: |
|
if colored and colors: |
|
chars.append(Colors.ENDC) |
|
colored = False |
|
chars.append(chr(c)) |
|
else: |
|
if (not colored) and colors: |
|
chars.append(Styles.UNPRINTABLE_DATA) |
|
colored = True |
|
chars.append('.') |
|
if colors: |
|
chars.append(Colors.ENDC) |
|
return ''.join(chars) |
|
|
|
def remove_color(s): |
|
ansi_escape = re.compile(r'\x1b[^m]*m') |
|
return ansi_escape.sub('', s) |
|
|
|
def hexdump(src, length=16): |
|
FILTER = ''.join([(len(repr(chr(x))) == 3) and chr(x) or '.' for x in range(256)]) |
|
lines = [] |
|
for c in range(0, len(src), length): |
|
chars = src[c:c+length] |
|
hex = ' '.join(["%02x" % x for x in chars]) |
|
printable = ''.join(["%s" % ((x <= 127 and FILTER[x]) or Styles.UNPRINTABLE_DATA+'.'+Colors.ENDC) for x in chars]) |
|
lines.append("%04x %-*s %s\n" % (c, length*3, hex, printable)) |
|
return ''.join(lines) |
|
|
|
def maybe_hexdump(s): |
|
if any(chr(c) not in string.printable for c in s): |
|
return hexdump(s) |
|
return s |
|
|
|
def print_table(coldata, rows): |
|
""" |
|
Print a table. |
|
Coldata: List of dicts with info on how to print the columns. |
|
``name`` is the heading to give column, |
|
``width (optional)`` maximum width before truncating. 0 for unlimited. |
|
|
|
Rows: List of tuples with the data to print |
|
""" |
|
|
|
# Get the width of each column |
|
widths = [] |
|
headers = [] |
|
for data in coldata: |
|
if 'name' in data: |
|
headers.append(data['name']) |
|
else: |
|
headers.append('') |
|
empty_headers = True |
|
for h in headers: |
|
if h != '': |
|
empty_headers = False |
|
if not empty_headers: |
|
rows = [headers] + rows |
|
|
|
for i in range(len(coldata)): |
|
col = coldata[i] |
|
if 'width' in col and col['width'] > 0: |
|
maxwidth = col['width'] |
|
else: |
|
maxwidth = 0 |
|
colwidth = 0 |
|
for row in rows: |
|
printdata = row[i] |
|
if isinstance(printdata, dict): |
|
collen = len(str(printdata['data'])) |
|
else: |
|
collen = len(str(printdata)) |
|
if collen > colwidth: |
|
colwidth = collen |
|
if maxwidth > 0 and colwidth > maxwidth: |
|
widths.append(maxwidth) |
|
else: |
|
widths.append(colwidth) |
|
|
|
# Print rows |
|
padding = 2 |
|
is_heading = not empty_headers |
|
for row in rows: |
|
if is_heading: |
|
sys.stdout.write(Styles.TABLE_HEADER) |
|
for (col, width) in zip(row, widths): |
|
if isinstance(col, dict): |
|
printstr = str(col['data']) |
|
if 'color' in col: |
|
colors = col['color'] |
|
formatter = None |
|
elif 'formatter' in col: |
|
colors = None |
|
formatter = col['formatter'] |
|
else: |
|
colors = None |
|
formatter = None |
|
else: |
|
printstr = str(col) |
|
colors = None |
|
formatter = None |
|
if len(printstr) > width: |
|
trunc_printstr=printstr[:width] |
|
trunc_printstr=trunc_printstr[:-3]+'...' |
|
else: |
|
trunc_printstr=printstr |
|
if colors is not None: |
|
sys.stdout.write(colors) |
|
sys.stdout.write(trunc_printstr) |
|
sys.stdout.write(Colors.ENDC) |
|
elif formatter is not None: |
|
toprint = formatter(printstr, width) |
|
sys.stdout.write(toprint) |
|
else: |
|
sys.stdout.write(trunc_printstr) |
|
sys.stdout.write(' '*(width-len(printstr))) |
|
sys.stdout.write(' '*padding) |
|
if is_heading: |
|
sys.stdout.write(Colors.ENDC) |
|
is_heading = False |
|
sys.stdout.write('\n') |
|
sys.stdout.flush() |
|
|
|
def print_requests(requests, client=None): |
|
""" |
|
Takes in a list of requests and prints a table with data on each of the |
|
requests. It's the same table that's used by ``ls``. |
|
""" |
|
rows = [] |
|
for req in requests: |
|
rows.append(get_req_data_row(req, client=client)) |
|
print_request_rows(rows) |
|
|
|
def print_request_rows(request_rows): |
|
""" |
|
Takes in a list of request rows generated from :func:`pappyproxy.console.get_req_data_row` |
|
and prints a table with data on each of the |
|
requests. Used instead of :func:`pappyproxy.console.print_requests` if you |
|
can't count on storing all the requests in memory at once. |
|
""" |
|
# Print a table with info on all the requests in the list |
|
cols = [ |
|
{'name':'ID'}, |
|
{'name':'Verb'}, |
|
{'name': 'Host'}, |
|
{'name':'Path', 'width':40}, |
|
{'name':'S-Code', 'width':16}, |
|
{'name':'Req Len'}, |
|
{'name':'Rsp Len'}, |
|
{'name':'Time'}, |
|
{'name':'Mngl'}, |
|
] |
|
print_rows = [] |
|
for row in request_rows: |
|
(reqid, verb, host, path, scode, qlen, slen, time, mngl) = row |
|
|
|
verb = {'data':verb, 'color':verb_color(verb)} |
|
scode = {'data':scode, 'color':scode_color(scode)} |
|
host = {'data':host, 'color':color_string(host, color_only=True)} |
|
path = {'data':path, 'formatter':path_formatter} |
|
|
|
print_rows.append((reqid, verb, host, path, scode, qlen, slen, time, mngl)) |
|
print_table(cols, print_rows) |
|
|
|
def get_req_data_row(request, client=None): |
|
""" |
|
Get the row data for a request to be printed. |
|
""" |
|
if client is not None: |
|
rid = client.get_reqid(request) |
|
else: |
|
rid = request.db_id |
|
method = request.method |
|
host = request.dest_host |
|
if not request.use_tls and request.dest_port != 80: |
|
host = "%s:%d" % (request.dest_host, request.dest_port) |
|
if request.use_tls and request.dest_port != 443: |
|
host = "%s:%d" % (request.dest_host, request.dest_port) |
|
path = request.url.geturl() |
|
reqlen = request.content_length |
|
rsplen = 'N/A' |
|
mangle_str = '--' |
|
|
|
if request.unmangled: |
|
mangle_str = 'q' |
|
|
|
if request.response: |
|
response_code = str(request.response.status_code) + \ |
|
' ' + request.response.reason |
|
rsplen = request.response.content_length |
|
if request.response.unmangled: |
|
if mangle_str == '--': |
|
mangle_str = 's' |
|
else: |
|
mangle_str += '/s' |
|
else: |
|
response_code = '' |
|
|
|
time_str = '--' |
|
if request.time_start and request.time_end: |
|
time_delt = request.time_end - request.time_start |
|
time_str = "%.2f" % time_delt.total_seconds() |
|
|
|
return [rid, method, host, path, response_code, |
|
reqlen, rsplen, time_str, mangle_str] |
|
|
|
def confirm(message, default='n'): |
|
""" |
|
A helper function to get confirmation from the user. It prints ``message`` |
|
then asks the user to answer yes or no. Returns True if the user answers |
|
yes, otherwise returns False. |
|
""" |
|
if 'n' in default.lower(): |
|
default = False |
|
else: |
|
default = True |
|
|
|
print(message) |
|
if default: |
|
answer = input('(Y/n) ') |
|
else: |
|
answer = input('(y/N) ') |
|
|
|
|
|
if not answer: |
|
return default |
|
|
|
if answer[0].lower() == 'y': |
|
return True |
|
else: |
|
return False |
|
|
|
# Taken from http://stackoverflow.com/questions/4770297/python-convert-utc-datetime-string-to-local-datetime |
|
def utc2local(utc): |
|
epoch = time.mktime(utc.timetuple()) |
|
offset = datetime.datetime.fromtimestamp(epoch) - datetime.datetime.utcfromtimestamp(epoch) |
|
return utc + offset |
|
|
|
def datetime_string(dt): |
|
dtobj = utc2local(dt) |
|
time_made_str = dtobj.strftime('%a, %b %d, %Y, %I:%M:%S.%f %p') |
|
return time_made_str |
|
|
|
def copy_to_clipboard(text): |
|
from .clip import copy |
|
copy(text) |
|
|
|
def clipboard_contents(): |
|
from .clip import paste |
|
return paste() |
|
|
|
def encode_basic_auth(username, password): |
|
decoded = '%s:%s' % (username, password) |
|
encoded = base64.b64encode(decoded.encode()) |
|
header = 'Basic %s' % encoded.decode() |
|
return header |
|
|
|
def parse_basic_auth(header): |
|
""" |
|
Parse a raw basic auth header and return (username, password) |
|
""" |
|
_, creds = header.split(' ', 1) |
|
decoded = base64.b64decode(creds) |
|
username, password = decoded.split(':', 1) |
|
return (username, password) |
|
|
|
def print_query(query): |
|
for p in query: |
|
fstrs = [] |
|
for f in p: |
|
fstrs.append(' '.join(f)) |
|
|
|
print((Colors.BLUE+' OR '+Colors.ENDC).join(fstrs)) |
|
|
|
def log_error(msg): |
|
print(msg) |
|
|
|
def autocomplete_startswith(text, lst, allow_spaces=False): |
|
ret = None |
|
if not text: |
|
ret = lst[:] |
|
else: |
|
ret = [n for n in lst if n.startswith(text)] |
|
if not allow_spaces: |
|
ret = [s for s in ret if ' ' not in s] |
|
return ret |
|
|
|
def load_reqlist(client, reqids, headers_only=False): |
|
ids = re.compile(r",\s*").split(reqids) |
|
if '*' in ids: |
|
for req in client.in_context_requests_iter(headers_only=headers_only): |
|
yield req |
|
for i in ids: |
|
try: |
|
yield client.req_by_id(i, headers_only=headers_only) |
|
except Exception as e: |
|
print(e) |
|
|
|
# Taken from http://stackoverflow.com/questions/16571150/how-to-capture-stdout-output-from-a-python-function-call |
|
# then modified |
|
class Capturing(): |
|
def __enter__(self): |
|
self._stdout = sys.stdout |
|
sys.stdout = self._stringio = StringIO() |
|
return self |
|
|
|
def __exit__(self, *args): |
|
self.val = self._stringio.getvalue() |
|
sys.stdout = self._stdout
|
|
|