239 lines
8.3 KiB
239 lines
8.3 KiB
#!/usr/bin/env python3 |
|
# Original Source: https://github.com/oblitum/dotfiles/blob/ArchLinux/.local/bin/MIMEmbellish |
|
|
|
import re |
|
import sys |
|
import email |
|
import shlex |
|
import mimetypes |
|
import subprocess |
|
from copy import copy |
|
from hashlib import md5 |
|
from email import charset |
|
from email import encoders |
|
from email.mime.text import MIMEText |
|
from email.mime.multipart import MIMEMultipart |
|
from email.mime.nonmultipart import MIMENonMultipart |
|
from os.path import basename, splitext, expanduser |
|
|
|
|
|
charset.add_charset('utf-8', charset.SHORTEST, '8bit') |
|
|
|
|
|
def pandoc(from_format, to_format='markdown', plain='markdown', title=None): |
|
markdown = ('markdown' |
|
'-blank_before_blockquote') |
|
|
|
if from_format == 'plain': |
|
from_format = plain |
|
if from_format == 'markdown': |
|
from_format = markdown |
|
if to_format == 'markdown': |
|
to_format = markdown |
|
|
|
command = 'pandoc -f {} -t {} --standalone --highlight-style=tango' |
|
if to_format in ('html', 'html5'): |
|
if title is not None: |
|
command += ' --variable=pagetitle:{}'.format(shlex.quote(title)) |
|
command += ' --webtex --template={}'.format( |
|
expanduser('~/.pandoc/templates/email.html')) |
|
return command.format(from_format, to_format) |
|
|
|
|
|
def gmailfy(payload): |
|
return payload.replace('<blockquote>', |
|
'<blockquote class="gmail_quote" style="' |
|
'padding: 0 7px 0 7px;' |
|
'border-left: 2px solid #cccccc;' |
|
'font-style: italic;' |
|
'margin: 0 0 7px 3px;' |
|
'">') |
|
|
|
|
|
def make_alternative(message, part): |
|
alternative = convert(part, 'html', |
|
pandoc(part.get_content_subtype(), |
|
to_format='html', |
|
title=message.get('Subject'))) |
|
alternative.set_payload(gmailfy(alternative.get_payload())) |
|
return alternative |
|
|
|
|
|
def make_replacement(message, part): |
|
return convert(part, 'plain', pandoc(part.get_content_subtype())) |
|
|
|
|
|
def convert(part, to_subtype, command): |
|
payload = part.get_payload() |
|
if isinstance(payload, str): |
|
payload = payload.encode('utf-8') |
|
else: |
|
payload = part.get_payload(None, True) |
|
if not isinstance(payload, bytes): |
|
payload = payload.encode('utf-8') |
|
process = subprocess.run( |
|
shlex.split(command), |
|
input=payload, stdout=subprocess.PIPE, check=True) |
|
return MIMEText(process.stdout, to_subtype, 'utf-8') |
|
|
|
|
|
def with_alternative(parent, part, from_signed, |
|
make_alternative=make_alternative, |
|
make_replacement=None): |
|
try: |
|
alternative = make_alternative(parent or part, from_signed or part) |
|
replacement = (make_replacement(parent or part, part) |
|
if from_signed is None and make_replacement is not None |
|
else part) |
|
except: |
|
return parent or part |
|
envelope = MIMEMultipart('alternative') |
|
if parent is None: |
|
for k, v in part.items(): |
|
if (k.lower() != 'mime-version' |
|
and not k.lower().startswith('content-')): |
|
envelope.add_header(k, v) |
|
del part[k] |
|
envelope.attach(replacement) |
|
envelope.attach(alternative) |
|
if parent is None: |
|
return envelope |
|
payload = parent.get_payload() |
|
payload[payload.index(part)] = envelope |
|
return parent |
|
|
|
|
|
def tag_attachments(message): |
|
if message.get_content_type() == 'multipart/mixed': |
|
for part in message.get_payload(): |
|
if (part.get_content_maintype() in ['image'] |
|
and 'Content-ID' not in part): |
|
filename = part.get_param('filename', |
|
header='Content-Disposition') |
|
if isinstance(filename, tuple): |
|
filename = str(filename[2], filename[0] or 'us-ascii') |
|
if filename: |
|
filename = splitext(basename(filename))[0] |
|
if filename: |
|
part.add_header('Content-ID', '<{}>'.format(filename)) |
|
return message |
|
|
|
|
|
def attachment_from_file_path(attachment_path): |
|
try: |
|
mime, encoding = mimetypes.guess_type(attachment_path, strict=False) |
|
maintype, subtype = mime.split('/') |
|
with open(attachment_path, 'rb') as payload: |
|
attachment = MIMENonMultipart(maintype, subtype) |
|
attachment.set_payload(payload.read()) |
|
encoders.encode_base64(attachment) |
|
if encoding: |
|
attachment.add_header('Content-Encoding', encoding) |
|
return attachment |
|
except: |
|
return None |
|
|
|
|
|
attachment_path_pattern = re.compile(r'\]\s*\(\s*file://(/[^)]*\S)\s*\)|' |
|
r'\]\s*:\s*file://(/.*\S)\s*$', |
|
re.MULTILINE) |
|
|
|
|
|
def link_attachments(payload): |
|
attached = [] |
|
attachments = [] |
|
|
|
def on_match(match): |
|
if match.group(1): |
|
attachment_path = match.group(1) |
|
cid_fmt = '](cid:{})' |
|
else: |
|
attachment_path = match.group(2) |
|
cid_fmt = ']: cid:{}' |
|
attachment_id = md5(attachment_path.encode()).hexdigest() |
|
if attachment_id in attached: |
|
return cid_fmt.format(attachment_id) |
|
attachment = attachment_from_file_path(attachment_path) |
|
if attachment: |
|
attachment.add_header('Content-ID', '<{}>'.format(attachment_id)) |
|
attachments.append(attachment) |
|
attached.append(attachment_id) |
|
return cid_fmt.format(attachment_id) |
|
return match.group() |
|
|
|
return attachments, attachment_path_pattern.sub(on_match, payload) |
|
|
|
|
|
def with_local_attachments(parent, part, from_signed, |
|
link_attachments=link_attachments): |
|
if from_signed is None: |
|
attachments, payload = link_attachments(part.get_payload()) |
|
part.set_payload(payload) |
|
else: |
|
attachments, payload = link_attachments(from_signed.get_payload()) |
|
from_signed = copy(from_signed) |
|
from_signed.set_payload(payload) |
|
if not attachments: |
|
return parent, part, from_signed |
|
if parent is None: |
|
parent = MIMEMultipart('mixed') |
|
for k, v in part.items(): |
|
if (k.lower() != 'mime-version' |
|
and not k.lower().startswith('content-')): |
|
parent.add_header(k, v) |
|
del part[k] |
|
parent.attach(part) |
|
for attachment in attachments: |
|
parent.attach(attachment) |
|
return parent, part, from_signed |
|
|
|
|
|
def is_target(part, target_subtypes): |
|
return (part.get('Content-Disposition', 'inline') == 'inline' |
|
and part.get_content_maintype() == 'text' |
|
and part.get_content_subtype() in target_subtypes) |
|
|
|
|
|
def pick_from_signed(part, target_subtypes): |
|
for from_signed in part.get_payload(): |
|
if is_target(from_signed, target_subtypes): |
|
return from_signed |
|
|
|
|
|
def seek_target(message, target_subtypes=['plain', 'markdown']): |
|
if message.is_multipart(): |
|
if message.get_content_type() == 'multipart/signed': |
|
part = pick_from_signed(message, target_subtypes) |
|
if part is not None: |
|
return None, message, part |
|
elif message.get_content_type() == 'multipart/mixed': |
|
for part in message.get_payload(): |
|
if part.is_multipart(): |
|
if part.get_content_type() == 'multipart/signed': |
|
from_signed = pick_from_signed(part, target_subtypes) |
|
if from_signed is not None: |
|
return message, part, from_signed |
|
elif is_target(part, target_subtypes): |
|
return message, part, None |
|
else: |
|
if is_target(message, target_subtypes): |
|
return None, message, None |
|
return None, None, None |
|
|
|
|
|
def main(): |
|
try: |
|
message = email.message_from_file(sys.stdin) |
|
parent, part, from_signed = seek_target(message) |
|
if (parent, part, from_signed) == (None, None, None): |
|
print(message) |
|
return |
|
tag_attachments(message) |
|
print(with_alternative( |
|
*with_local_attachments(parent, part, from_signed))) |
|
except (BrokenPipeError, KeyboardInterrupt): |
|
pass |
|
|
|
|
|
if __name__ == '__main__': |
|
main()
|
|
|