diff options
-rwxr-xr-x | app.py | 133 | ||||
-rw-r--r-- | config.def.py | 12 |
2 files changed, 91 insertions, 54 deletions
@@ -21,12 +21,14 @@ from flask import Flask from flask import render_template, request, redirect, abort +from werkzeug.wrappers.response import Response as werkzeugResponse from flask.wrappers import Response import sys from email.mime.text import MIMEText from subprocess import Popen, PIPE +from typing import Optional, Any, Union from html import escape import time, os import quopri @@ -37,8 +39,10 @@ import mailbox import config from markupsafe import Markup +db: list[dict[str, Optional[str]]] -def load_database(user): + +def load_database(user: str) -> list[dict[str, Optional[str]]]: try: db_file = open("%s" % config.MAPPING[user][1], "r+") except FileNotFoundError: @@ -53,24 +57,28 @@ def load_database(user): app = Flask(__name__) -def generate_user_list_from_mapping(mapping): +def generate_user_list_from_mapping( + mapping: dict[str, tuple[str, str, str, str]] +) -> str: generated_user_list = "" for username, usertuple in mapping.items(): - generated_user_list += "<li><a href=\"/" + generated_user_list += '<li><a href="/' generated_user_list += username - generated_user_list += "\">" + generated_user_list += '">' generated_user_list += usertuple[3] generated_user_list += "</a> [Email: " - generated_user_list += usertuple[0].replace("@", " <i>at</i> ").replace(".", " <i>dot</i> ") - generated_user_list += ", homepage: <a href=\"" + generated_user_list += ( + usertuple[0].replace("@", " <i>at</i> ").replace(".", " <i>dot</i> ") + ) + generated_user_list += ', homepage: <a href="' generated_user_list += usertuple[2] - generated_user_list += "\">" + generated_user_list += '">' generated_user_list += usertuple[2] generated_user_list += "</a>]</li>\n" return generated_user_list -def generate_past_questions_from_database(db): +def generate_past_questions_from_database(db: list[dict[str, Optional[str]]]) -> str: generated_questions_html_listing = "" for qs in reversed(db): if not qs["a"]: @@ -78,27 +86,36 @@ def generate_past_questions_from_database(db): generated_questions_html_listing += "<hr />" generated_questions_html_listing += '<div class="single-past-question">' generated_questions_html_listing += '<pre class="past-question-question">' - generated_questions_html_listing += escape(qs["q"]) # questions are not trusted and must be escaped + assert type(qs["q"]) is str + generated_questions_html_listing += escape( + qs["q"] + ) # questions are not trusted and must be escaped generated_questions_html_listing += "</pre>" generated_questions_html_listing += '<span class="past-question-answer">' - generated_questions_html_listing += qs["a"] # answers are trusted and may include HTML + generated_questions_html_listing += qs[ + "a" + ] # answers are trusted and may include HTML generated_questions_html_listing += "</span>" generated_questions_html_listing += "</div>" return generated_questions_html_listing -def dump_database(user, db): - #with open(config.MAPPING[user][1], "w") as db_file: + +def dump_database(user: str, db: list[dict[str, Optional[str]]]) -> None: + # with open(config.MAPPING[user][1], "w") as db_file: # I've replaced this so that if the write fails it doesn't corrupt the JSON file fn = config.MAPPING[user][1] - with open(fn + '.tmp', 'w') as db_file: + with open(fn + ".tmp", "w") as db_file: json.dump(db, db_file, indent=4) - os.replace(fn + '.tmp', fn) + os.replace(fn + ".tmp", fn) + re_named_address = re.compile("(.*) <(.*)\\@(.*)>") re_unnamed_address = re.compile("^[<](.*)\\@(.*)[>]$") -def parse_address(s): # Returns name, user, host. +def parse_address( + s: str, +) -> Optional[tuple[Optional[str], str, str]]: # Returns name, user, host. attempt_named_address = re_named_address.match(s) if attempt_named_address: return ( @@ -119,7 +136,7 @@ def parse_address(s): # Returns name, user, host. @app.route("/<user>", methods=["GET", "POST"]) -def qboard(user): +def qboard(user: str) -> Union[Response, werkzeugResponse, str, tuple[str, int]]: if user not in config.MAPPING: return render_template("unknown_user.html", faulty_username=user) elif request.method == "GET": @@ -129,24 +146,29 @@ def qboard(user): # EMAIL STUFF IS TO BE ADDED HERE # Why not in a separate email_stuff() function? # Because lazy - mbox = mailbox.Maildir('/home/qbox/Mail/Inbox') + mbox = mailbox.Maildir("/home/qbox/Mail/Inbox") for msg_id, msg in mbox.items(): - if msg.get_subdir() != "new": continue + if msg.get_subdir() != "new": + continue mbox.lock() msg.add_flag("SR") - msg.set_subdir("cur") # apparently it's not doing so + msg.set_subdir("cur") # apparently it's not doing so mbox[msg_id] = msg mbox.flush() mbox.unlock() parsed_address = parse_address(msg["From"]) + if not parsed_address: + print("WARNING: cannot access emails due to address parsing error") + break from_address = parsed_address[1] + "@" + parsed_address[2] - if from_address != config.MAPPING[user][0]: continue - if 'In-Reply-To' not in msg.keys() or not msg['In-Reply-To']: + if from_address != config.MAPPING[user][0]: + continue + if "In-Reply-To" not in msg.keys() or not msg["In-Reply-To"]: ts = str(time.time()) newmsg = MIMEText( f"Hello {config.MAPPING[user][3]},\n\nI cannot understand this unsolicited message. If you are trying to reply to a message, be sure to use the ``reply'' feature in your email client to indicate that you are reply to that specific notification of mine. If something sounds wrong, contact your server administrator.\n\nQuestion Box System" ) - newmsg["From"] = config.MAIL_ADDR + newmsg["From"] = config.MAIL_PREFIX + "@" + config.MAIL_HOST newmsg["To"] = msg["From"] if msg["Subject"].startswith("Re: "): newmsg["Subject"] = msg["Subject"] @@ -159,10 +181,16 @@ def qboard(user): newmsg["Message-Id"] = "<qbox-system-%s@%s>" % (ts, config.MAIL_HOST) p = Popen(["/usr/sbin/sendmail", "-t", "-oi"], stdin=PIPE) p.communicate(newmsg.as_bytes()) - return # This return was missing - - reply_identifier = parse_address(str(msg['In-Reply-To']))[1] # Should be ts + continue + # return # This return was missing + + parsed_address = parse_address(str(msg["In-Reply-To"])) + if not parsed_address: + print("WARNING: cannot access emails due to address parsing error") + break + reply_identifier = parsed_address[1] # Should be ts for question in reversed(db): + assert type(question["ts"]) is str if reply_identifier == "qbox-" + question["ts"]: break else: @@ -170,7 +198,7 @@ def qboard(user): newmsg = MIMEText( f"Hello {config.MAPPING[user][3]},\n\nYou sent me a message, which was supposedly a reply to one of my notifications. However, I do not recognize your In-Reply-To header as one of my Message-IDs, so I don't know how to handle your message. Maybe you could check if you are using your email client's ``reply'' feature correctly, or if the database has been modified to remove the notified post?\n\nThings are getting weird. Contact your server administrator if confused.\n\nQuestion Box System" ) - newmsg["From"] = config.MAIL_ADDR + newmsg["From"] = config.MAIL_PREFIX + "@" + config.MAIL_HOST newmsg["To"] = msg["From"] if msg["Subject"].startswith("Re: "): newmsg["Subject"] = msg["Subject"] @@ -183,7 +211,7 @@ def qboard(user): newmsg["Message-Id"] = "<qbox-system-%s@%s>" % (ts, config.MAIL_HOST) p = Popen(["/usr/sbin/sendmail", "-t", "-oi"], stdin=PIPE) p.communicate(newmsg.as_bytes()) - return + continue for part in msg.walk(): if part.get_content_type() == "text/plain": @@ -197,7 +225,7 @@ def qboard(user): newmsg = MIMEText( f"Hello {config.MAPPING[user][3]},\n\nYour reply was in an incorrect format. Please ensure that it includes at least one subpart of MIME type ``text/plain'' (or ``text/html'' but that's not recommended).\n\nQuestion Box System" ) - newmsg["From"] = config.MAIL_ADDR + newmsg["From"] = config.MAIL_PREFIX + "@" + config.MAIL_HOST newmsg["To"] = msg["From"] if msg["Subject"].startswith("Re: "): newmsg["Subject"] = msg["Subject"] @@ -207,15 +235,21 @@ def qboard(user): newmsg["In-Reply-To"] = msg["Message-ID"] elif "Message-Id" in msg.keys(): newmsg["In-Reply-To"] = msg["Message-Id"] - newmsg["Message-Id"] = "<qbox-system-%s@%s>" % (ts, config.MAIL_HOST) + newmsg["Message-Id"] = "<qbox-system-%s@%s>" % ( + ts, + config.MAIL_HOST, + ) p = Popen(["/usr/sbin/sendmail", "-t", "-oi"], stdin=PIPE) p.communicate(newmsg.as_bytes()) - return + continue - received_message_text_plain = part.get_payload(decode=True).decode("utf-8", "surrogateescape") + received_message_text_plain = part.get_payload(decode=True).decode( + "utf-8", "surrogateescape" + ) if part.get_content_type() == "text/plain": - received_message_text_decoded = received_message_text_plain.replace("\r\n", "<br />").replace("\n", "<br />") - + received_message_text_decoded = received_message_text_plain.replace( + "\r\n", "<br />" + ).replace("\n", "<br />") db.remove(question) question["a"] = received_message_text_decoded @@ -225,8 +259,8 @@ def qboard(user): ts = str(time.time()) newmsg = MIMEText( f"Hello {config.MAPPING[user][3]},\n\nI have received your message and I added it to the question board.\n\nQuestion Box System" - ) - newmsg["From"] = config.MAIL_ADDR + ) + newmsg["From"] = config.MAIL_PREFIX + "@" + config.MAIL_HOST newmsg["To"] = msg["From"] if msg["Subject"].startswith("Re: "): newmsg["Subject"] = msg["Subject"] @@ -239,10 +273,13 @@ def qboard(user): newmsg["Message-Id"] = "<qbox-system-%s@%s>" % (ts, config.MAIL_HOST) p = Popen(["/usr/sbin/sendmail", "-t", "-oi"], stdin=PIPE) p.communicate(newmsg.as_bytes()) - return render_template('qboard.html', - username = Markup(config.MAPPING[user][3]), - pq = Markup(generate_past_questions_from_database(db))) + return render_template( + "qboard.html", + username=Markup(config.MAPPING[user][3]), + pq=Markup(generate_past_questions_from_database(db)), + ) elif request.method == "POST": + assert type(request.content_length) is int if request.content_length > 1024 * 20: return "Your request is too large!!!" ts = str(time.time()) @@ -251,15 +288,15 @@ def qboard(user): db = load_database(user) db.append({"q": text, "a": None, "ts": ts}) dump_database(user, db) - msg = MIMEText( + newmsg = MIMEText( f"Hello {config.MAPPING[user][3]},\n\nThe following message was received in your ({user}'s) question box at server timestamp {ts}. Please reply to this in plain text email, as in the MIME type should be ``text/plain''; you may handwrite HTML in your reply; newlines will be automatically converted to ``<br />''s. Alternatively, HTML email will be accepted but are not recommended. Remember to remove any quoted text if your email client adds these automatically. Attachments will be ignored.\n\n{text}\n\nQuestion Box System" ) - msg["From"] = config.MAIL_ADDR - msg["To"] = config.MAPPING[user][0] - msg["Subject"] = "Question Box Message" - msg["Message-Id"] = "<qbox-%s@%s>" % (ts, config.MAIL_HOST) + newmsg["From"] = config.MAIL_PREFIX + "@" + config.MAIL_HOST + newmsg["To"] = config.MAPPING[user][0] + newmsg["Subject"] = "Question Box Message" + newmsg["Message-Id"] = "<qbox-%s@%s>" % (ts, config.MAIL_HOST) p = Popen(["/usr/sbin/sendmail", "-t", "-oi"], stdin=PIPE) - p.communicate(msg.as_bytes()) + p.communicate(newmsg.as_bytes()) else: return "Empty submissions are forbidden." return "Submission successful.\n\nPlease press the ``back'' button of your browser or otherwise return to the previous page." @@ -267,10 +304,10 @@ def qboard(user): @app.route("/", methods=["GET"]) -def index(): - return render_template('home.html', - userlist = Markup(generate_user_list_from_mapping(config.MAPPING))) - +def index() -> Union[Response, werkzeugResponse, str, tuple[str, int]]: + return render_template( + "home.html", userlist=Markup(generate_user_list_from_mapping(config.MAPPING)) + ) if __name__ == "__main__": diff --git a/config.def.py b/config.def.py index 624cab3..4d3fb2a 100644 --- a/config.def.py +++ b/config.def.py @@ -1,14 +1,14 @@ # Copy this to config.py MAPPING = { - "username": ( - "email@example.tld", - "path-to-config.json", - 'https://homepage.example.tld/', - 'Full Name' + "runxiyu": ( + "me@runxiyu.org", + "runxiyu.json", + 'https://runxiyu.org/', + 'Runxi Yu' ), } PORT = 5728 MAIL_HOST = "andrewyu.org" -MAIL_ADDR = "qbox@andrewyu.org" +MAIL_PREFIX = "qbox" |