import re import sys import string import time import datetime import base64 from pygments.formatters import TerminalFormatter from pygments.lexers import get_lexer_for_mimetype, HttpLexer from pygments import highlight from io import StringIO from .colors import Colors, Styles, verb_color, scode_color, path_formatter, color_string def str_hash_code(s): h = 0 n = len(s)-1 for c in s.encode(): h += c*31**n n -= 1 return h def printable_data(data, colors=True): """ Return ``data``, but replaces unprintable characters with periods. :param data: The data to make printable :type data: String :rtype: String """ chars = [] colored = False for c in data: if chr(c) in string.printable: if colored and colors: chars.append(Colors.ENDC) colored = False chars.append(chr(c)) else: if (not colored) and colors: chars.append(Styles.UNPRINTABLE_DATA) colored = True chars.append('.') if colors: chars.append(Colors.ENDC) return ''.join(chars) def remove_color(s): ansi_escape = re.compile(r'\x1b[^m]*m') return ansi_escape.sub('', s) def hexdump(src, length=16): FILTER = ''.join([(len(repr(chr(x))) == 3) and chr(x) or '.' for x in range(256)]) lines = [] for c in range(0, len(src), length): chars = src[c:c+length] hex = ' '.join(["%02x" % x for x in chars]) printable = ''.join(["%s" % ((x <= 127 and FILTER[x]) or Styles.UNPRINTABLE_DATA+'.'+Colors.ENDC) for x in chars]) lines.append("%04x %-*s %s\n" % (c, length*3, hex, printable)) return ''.join(lines) def maybe_hexdump(s): if any(chr(c) not in string.printable for c in s): return hexdump(s) return s def print_table(coldata, rows): """ Print a table. Coldata: List of dicts with info on how to print the columns. ``name`` is the heading to give column, ``width (optional)`` maximum width before truncating. 0 for unlimited. Rows: List of tuples with the data to print """ # Get the width of each column widths = [] headers = [] for data in coldata: if 'name' in data: headers.append(data['name']) else: headers.append('') empty_headers = True for h in headers: if h != '': empty_headers = False if not empty_headers: rows = [headers] + rows for i in range(len(coldata)): col = coldata[i] if 'width' in col and col['width'] > 0: maxwidth = col['width'] else: maxwidth = 0 colwidth = 0 for row in rows: printdata = row[i] if isinstance(printdata, dict): collen = len(str(printdata['data'])) else: collen = len(str(printdata)) if collen > colwidth: colwidth = collen if maxwidth > 0 and colwidth > maxwidth: widths.append(maxwidth) else: widths.append(colwidth) # Print rows padding = 2 is_heading = not empty_headers for row in rows: if is_heading: sys.stdout.write(Styles.TABLE_HEADER) for (col, width) in zip(row, widths): if isinstance(col, dict): printstr = str(col['data']) if 'color' in col: colors = col['color'] formatter = None elif 'formatter' in col: colors = None formatter = col['formatter'] else: colors = None formatter = None else: printstr = str(col) colors = None formatter = None if len(printstr) > width: trunc_printstr=printstr[:width] trunc_printstr=trunc_printstr[:-3]+'...' else: trunc_printstr=printstr if colors is not None: sys.stdout.write(colors) sys.stdout.write(trunc_printstr) sys.stdout.write(Colors.ENDC) elif formatter is not None: toprint = formatter(printstr, width) sys.stdout.write(toprint) else: sys.stdout.write(trunc_printstr) sys.stdout.write(' '*(width-len(printstr))) sys.stdout.write(' '*padding) if is_heading: sys.stdout.write(Colors.ENDC) is_heading = False sys.stdout.write('\n') sys.stdout.flush() def print_requests(requests, client=None): """ Takes in a list of requests and prints a table with data on each of the requests. It's the same table that's used by ``ls``. """ rows = [] for req in requests: rows.append(get_req_data_row(req, client=client)) print_request_rows(rows) def print_request_rows(request_rows): """ Takes in a list of request rows generated from :func:`pappyproxy.console.get_req_data_row` and prints a table with data on each of the requests. Used instead of :func:`pappyproxy.console.print_requests` if you can't count on storing all the requests in memory at once. """ # Print a table with info on all the requests in the list cols = [ {'name':'ID'}, {'name':'Verb'}, {'name': 'Host'}, {'name':'Path', 'width':40}, {'name':'S-Code', 'width':16}, {'name':'Req Len'}, {'name':'Rsp Len'}, {'name':'Time'}, {'name':'Mngl'}, ] print_rows = [] for row in request_rows: (reqid, verb, host, path, scode, qlen, slen, time, mngl) = row verb = {'data':verb, 'color':verb_color(verb)} scode = {'data':scode, 'color':scode_color(scode)} host = {'data':host, 'color':color_string(host, color_only=True)} path = {'data':path, 'formatter':path_formatter} print_rows.append((reqid, verb, host, path, scode, qlen, slen, time, mngl)) print_table(cols, print_rows) def get_req_data_row(request, client=None): """ Get the row data for a request to be printed. """ if client is not None: rid = client.get_reqid(request) else: rid = request.db_id method = request.method host = request.dest_host if not request.use_tls and request.dest_port != 80: host = "%s:%d" % (request.dest_host, request.dest_port) if request.use_tls and request.dest_port != 443: host = "%s:%d" % (request.dest_host, request.dest_port) path = request.url.geturl() reqlen = request.content_length rsplen = 'N/A' mangle_str = '--' if request.unmangled: mangle_str = 'q' if request.response: response_code = str(request.response.status_code) + \ ' ' + request.response.reason rsplen = request.response.content_length if request.response.unmangled: if mangle_str == '--': mangle_str = 's' else: mangle_str += '/s' else: response_code = '' time_str = '--' if request.time_start and request.time_end: time_delt = request.time_end - request.time_start time_str = "%.2f" % time_delt.total_seconds() return [rid, method, host, path, response_code, reqlen, rsplen, time_str, mangle_str] def confirm(message, default='n'): """ A helper function to get confirmation from the user. It prints ``message`` then asks the user to answer yes or no. Returns True if the user answers yes, otherwise returns False. """ if 'n' in default.lower(): default = False else: default = True print(message) if default: answer = input('(Y/n) ') else: answer = input('(y/N) ') if not answer: return default if answer[0].lower() == 'y': return True else: return False # Taken from def utc2local(utc): epoch = time.mktime(utc.timetuple()) offset = datetime.datetime.fromtimestamp(epoch) - datetime.datetime.utcfromtimestamp(epoch) return utc + offset def datetime_string(dt): dtobj = utc2local(dt) time_made_str = dtobj.strftime('%a, %b %d, %Y, %I:%M:%S.%f %p') return time_made_str def copy_to_clipboard(text): from .clip import copy copy(text) def clipboard_contents(): from .clip import paste return paste() def encode_basic_auth(username, password): decoded = '%s:%s' % (username, password) encoded = base64.b64encode(decoded.encode()) header = 'Basic %s' % encoded.decode() return header def parse_basic_auth(header): """ Parse a raw basic auth header and return (username, password) """ _, creds = header.split(' ', 1) decoded = base64.b64decode(creds) username, password = decoded.split(':', 1) return (username, password) def print_query(query): for p in query: fstrs = [] for f in p: fstrs.append(' '.join(f)) print((Colors.BLUE+' OR '+Colors.ENDC).join(fstrs)) def log_error(msg): print(msg) def autocomplete_startswith(text, lst, allow_spaces=False): ret = None if not text: ret = lst[:] else: ret = [n for n in lst if n.startswith(text)] if not allow_spaces: ret = [s for s in ret if ' ' not in s] return ret def load_reqlist(client, reqids, headers_only=False): ids = re.compile(r",\s*").split(reqids) if '*' in ids: for req in client.in_context_requests_iter(headers_only=headers_only): yield req for i in ids: try: yield client.req_by_id(i, headers_only=headers_only) except Exception as e: print(e) # Taken from # then modified class Capturing(): def __enter__(self): self._stdout = sys.stdout sys.stdout = self._stringio = StringIO() return self def __exit__(self, *args): self.val = self._stringio.getvalue() sys.stdout = self._stdout