aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xapp.py133
-rw-r--r--config.def.py12
2 files changed, 91 insertions, 54 deletions
diff --git a/app.py b/app.py
index 05d7aeb..4266858 100755
--- a/app.py
+++ b/app.py
@@ -21,12 +21,14 @@
from flask import Flask
from flask import render_template, request, redirect, abort
+from werkzeug.wrappers.response import Response as werkzeugResponse
from flask.wrappers import Response
import sys
from email.mime.text import MIMEText
from subprocess import Popen, PIPE
+from typing import Optional, Any, Union
from html import escape
import time, os
import quopri
@@ -37,8 +39,10 @@ import mailbox
import config
from markupsafe import Markup
+db: list[dict[str, Optional[str]]]
-def load_database(user):
+
+def load_database(user: str) -> list[dict[str, Optional[str]]]:
try:
db_file = open("%s" % config.MAPPING[user][1], "r+")
except FileNotFoundError:
@@ -53,24 +57,28 @@ def load_database(user):
app = Flask(__name__)
-def generate_user_list_from_mapping(mapping):
+def generate_user_list_from_mapping(
+ mapping: dict[str, tuple[str, str, str, str]]
+) -> str:
generated_user_list = ""
for username, usertuple in mapping.items():
- generated_user_list += "<li><a href=\"/"
+ generated_user_list += '<li><a href="/'
generated_user_list += username
- generated_user_list += "\">"
+ generated_user_list += '">'
generated_user_list += usertuple[3]
generated_user_list += "</a> [Email: "
- generated_user_list += usertuple[0].replace("@", " <i>at</i> ").replace(".", " <i>dot</i> ")
- generated_user_list += ", homepage: <a href=\""
+ generated_user_list += (
+ usertuple[0].replace("@", " <i>at</i> ").replace(".", " <i>dot</i> ")
+ )
+ generated_user_list += ', homepage: <a href="'
generated_user_list += usertuple[2]
- generated_user_list += "\">"
+ generated_user_list += '">'
generated_user_list += usertuple[2]
generated_user_list += "</a>]</li>\n"
return generated_user_list
-def generate_past_questions_from_database(db):
+def generate_past_questions_from_database(db: list[dict[str, Optional[str]]]) -> str:
generated_questions_html_listing = ""
for qs in reversed(db):
if not qs["a"]:
@@ -78,27 +86,36 @@ def generate_past_questions_from_database(db):
generated_questions_html_listing += "<hr />"
generated_questions_html_listing += '<div class="single-past-question">'
generated_questions_html_listing += '<pre class="past-question-question">'
- generated_questions_html_listing += escape(qs["q"]) # questions are not trusted and must be escaped
+ assert type(qs["q"]) is str
+ generated_questions_html_listing += escape(
+ qs["q"]
+ ) # questions are not trusted and must be escaped
generated_questions_html_listing += "</pre>"
generated_questions_html_listing += '<span class="past-question-answer">'
- generated_questions_html_listing += qs["a"] # answers are trusted and may include HTML
+ generated_questions_html_listing += qs[
+ "a"
+ ] # answers are trusted and may include HTML
generated_questions_html_listing += "</span>"
generated_questions_html_listing += "</div>"
return generated_questions_html_listing
-def dump_database(user, db):
- #with open(config.MAPPING[user][1], "w") as db_file:
+
+def dump_database(user: str, db: list[dict[str, Optional[str]]]) -> None:
+ # with open(config.MAPPING[user][1], "w") as db_file:
# I've replaced this so that if the write fails it doesn't corrupt the JSON file
fn = config.MAPPING[user][1]
- with open(fn + '.tmp', 'w') as db_file:
+ with open(fn + ".tmp", "w") as db_file:
json.dump(db, db_file, indent=4)
- os.replace(fn + '.tmp', fn)
+ os.replace(fn + ".tmp", fn)
+
re_named_address = re.compile("(.*) <(.*)\\@(.*)>")
re_unnamed_address = re.compile("^[<](.*)\\@(.*)[>]$")
-def parse_address(s): # Returns name, user, host.
+def parse_address(
+ s: str,
+) -> Optional[tuple[Optional[str], str, str]]: # Returns name, user, host.
attempt_named_address = re_named_address.match(s)
if attempt_named_address:
return (
@@ -119,7 +136,7 @@ def parse_address(s): # Returns name, user, host.
@app.route("/<user>", methods=["GET", "POST"])
-def qboard(user):
+def qboard(user: str) -> Union[Response, werkzeugResponse, str, tuple[str, int]]:
if user not in config.MAPPING:
return render_template("unknown_user.html", faulty_username=user)
elif request.method == "GET":
@@ -129,24 +146,29 @@ def qboard(user):
# EMAIL STUFF IS TO BE ADDED HERE
# Why not in a separate email_stuff() function?
# Because lazy
- mbox = mailbox.Maildir('/home/qbox/Mail/Inbox')
+ mbox = mailbox.Maildir("/home/qbox/Mail/Inbox")
for msg_id, msg in mbox.items():
- if msg.get_subdir() != "new": continue
+ if msg.get_subdir() != "new":
+ continue
mbox.lock()
msg.add_flag("SR")
- msg.set_subdir("cur") # apparently it's not doing so
+ msg.set_subdir("cur") # apparently it's not doing so
mbox[msg_id] = msg
mbox.flush()
mbox.unlock()
parsed_address = parse_address(msg["From"])
+ if not parsed_address:
+ print("WARNING: cannot access emails due to address parsing error")
+ break
from_address = parsed_address[1] + "@" + parsed_address[2]
- if from_address != config.MAPPING[user][0]: continue
- if 'In-Reply-To' not in msg.keys() or not msg['In-Reply-To']:
+ if from_address != config.MAPPING[user][0]:
+ continue
+ if "In-Reply-To" not in msg.keys() or not msg["In-Reply-To"]:
ts = str(time.time())
newmsg = MIMEText(
f"Hello {config.MAPPING[user][3]},\n\nI cannot understand this unsolicited message. If you are trying to reply to a message, be sure to use the ``reply'' feature in your email client to indicate that you are reply to that specific notification of mine. If something sounds wrong, contact your server administrator.\n\nQuestion Box System"
)
- newmsg["From"] = config.MAIL_ADDR
+ newmsg["From"] = config.MAIL_PREFIX + "@" + config.MAIL_HOST
newmsg["To"] = msg["From"]
if msg["Subject"].startswith("Re: "):
newmsg["Subject"] = msg["Subject"]
@@ -159,10 +181,16 @@ def qboard(user):
newmsg["Message-Id"] = "<qbox-system-%s@%s>" % (ts, config.MAIL_HOST)
p = Popen(["/usr/sbin/sendmail", "-t", "-oi"], stdin=PIPE)
p.communicate(newmsg.as_bytes())
- return # This return was missing
-
- reply_identifier = parse_address(str(msg['In-Reply-To']))[1] # Should be ts
+ continue
+ # return # This return was missing
+
+ parsed_address = parse_address(str(msg["In-Reply-To"]))
+ if not parsed_address:
+ print("WARNING: cannot access emails due to address parsing error")
+ break
+ reply_identifier = parsed_address[1] # Should be ts
for question in reversed(db):
+ assert type(question["ts"]) is str
if reply_identifier == "qbox-" + question["ts"]:
break
else:
@@ -170,7 +198,7 @@ def qboard(user):
newmsg = MIMEText(
f"Hello {config.MAPPING[user][3]},\n\nYou sent me a message, which was supposedly a reply to one of my notifications. However, I do not recognize your In-Reply-To header as one of my Message-IDs, so I don't know how to handle your message. Maybe you could check if you are using your email client's ``reply'' feature correctly, or if the database has been modified to remove the notified post?\n\nThings are getting weird. Contact your server administrator if confused.\n\nQuestion Box System"
)
- newmsg["From"] = config.MAIL_ADDR
+ newmsg["From"] = config.MAIL_PREFIX + "@" + config.MAIL_HOST
newmsg["To"] = msg["From"]
if msg["Subject"].startswith("Re: "):
newmsg["Subject"] = msg["Subject"]
@@ -183,7 +211,7 @@ def qboard(user):
newmsg["Message-Id"] = "<qbox-system-%s@%s>" % (ts, config.MAIL_HOST)
p = Popen(["/usr/sbin/sendmail", "-t", "-oi"], stdin=PIPE)
p.communicate(newmsg.as_bytes())
- return
+ continue
for part in msg.walk():
if part.get_content_type() == "text/plain":
@@ -197,7 +225,7 @@ def qboard(user):
newmsg = MIMEText(
f"Hello {config.MAPPING[user][3]},\n\nYour reply was in an incorrect format. Please ensure that it includes at least one subpart of MIME type ``text/plain'' (or ``text/html'' but that's not recommended).\n\nQuestion Box System"
)
- newmsg["From"] = config.MAIL_ADDR
+ newmsg["From"] = config.MAIL_PREFIX + "@" + config.MAIL_HOST
newmsg["To"] = msg["From"]
if msg["Subject"].startswith("Re: "):
newmsg["Subject"] = msg["Subject"]
@@ -207,15 +235,21 @@ def qboard(user):
newmsg["In-Reply-To"] = msg["Message-ID"]
elif "Message-Id" in msg.keys():
newmsg["In-Reply-To"] = msg["Message-Id"]
- newmsg["Message-Id"] = "<qbox-system-%s@%s>" % (ts, config.MAIL_HOST)
+ newmsg["Message-Id"] = "<qbox-system-%s@%s>" % (
+ ts,
+ config.MAIL_HOST,
+ )
p = Popen(["/usr/sbin/sendmail", "-t", "-oi"], stdin=PIPE)
p.communicate(newmsg.as_bytes())
- return
+ continue
- received_message_text_plain = part.get_payload(decode=True).decode("utf-8", "surrogateescape")
+ received_message_text_plain = part.get_payload(decode=True).decode(
+ "utf-8", "surrogateescape"
+ )
if part.get_content_type() == "text/plain":
- received_message_text_decoded = received_message_text_plain.replace("\r\n", "<br />").replace("\n", "<br />")
-
+ received_message_text_decoded = received_message_text_plain.replace(
+ "\r\n", "<br />"
+ ).replace("\n", "<br />")
db.remove(question)
question["a"] = received_message_text_decoded
@@ -225,8 +259,8 @@ def qboard(user):
ts = str(time.time())
newmsg = MIMEText(
f"Hello {config.MAPPING[user][3]},\n\nI have received your message and I added it to the question board.\n\nQuestion Box System"
- )
- newmsg["From"] = config.MAIL_ADDR
+ )
+ newmsg["From"] = config.MAIL_PREFIX + "@" + config.MAIL_HOST
newmsg["To"] = msg["From"]
if msg["Subject"].startswith("Re: "):
newmsg["Subject"] = msg["Subject"]
@@ -239,10 +273,13 @@ def qboard(user):
newmsg["Message-Id"] = "<qbox-system-%s@%s>" % (ts, config.MAIL_HOST)
p = Popen(["/usr/sbin/sendmail", "-t", "-oi"], stdin=PIPE)
p.communicate(newmsg.as_bytes())
- return render_template('qboard.html',
- username = Markup(config.MAPPING[user][3]),
- pq = Markup(generate_past_questions_from_database(db)))
+ return render_template(
+ "qboard.html",
+ username=Markup(config.MAPPING[user][3]),
+ pq=Markup(generate_past_questions_from_database(db)),
+ )
elif request.method == "POST":
+ assert type(request.content_length) is int
if request.content_length > 1024 * 20:
return "Your request is too large!!!"
ts = str(time.time())
@@ -251,15 +288,15 @@ def qboard(user):
db = load_database(user)
db.append({"q": text, "a": None, "ts": ts})
dump_database(user, db)
- msg = MIMEText(
+ newmsg = MIMEText(
f"Hello {config.MAPPING[user][3]},\n\nThe following message was received in your ({user}'s) question box at server timestamp {ts}. Please reply to this in plain text email, as in the MIME type should be ``text/plain''; you may handwrite HTML in your reply; newlines will be automatically converted to ``<br />''s. Alternatively, HTML email will be accepted but are not recommended. Remember to remove any quoted text if your email client adds these automatically. Attachments will be ignored.\n\n{text}\n\nQuestion Box System"
)
- msg["From"] = config.MAIL_ADDR
- msg["To"] = config.MAPPING[user][0]
- msg["Subject"] = "Question Box Message"
- msg["Message-Id"] = "<qbox-%s@%s>" % (ts, config.MAIL_HOST)
+ newmsg["From"] = config.MAIL_PREFIX + "@" + config.MAIL_HOST
+ newmsg["To"] = config.MAPPING[user][0]
+ newmsg["Subject"] = "Question Box Message"
+ newmsg["Message-Id"] = "<qbox-%s@%s>" % (ts, config.MAIL_HOST)
p = Popen(["/usr/sbin/sendmail", "-t", "-oi"], stdin=PIPE)
- p.communicate(msg.as_bytes())
+ p.communicate(newmsg.as_bytes())
else:
return "Empty submissions are forbidden."
return "Submission successful.\n\nPlease press the ``back'' button of your browser or otherwise return to the previous page."
@@ -267,10 +304,10 @@ def qboard(user):
@app.route("/", methods=["GET"])
-def index():
- return render_template('home.html',
- userlist = Markup(generate_user_list_from_mapping(config.MAPPING)))
-
+def index() -> Union[Response, werkzeugResponse, str, tuple[str, int]]:
+ return render_template(
+ "home.html", userlist=Markup(generate_user_list_from_mapping(config.MAPPING))
+ )
if __name__ == "__main__":
diff --git a/config.def.py b/config.def.py
index 624cab3..4d3fb2a 100644
--- a/config.def.py
+++ b/config.def.py
@@ -1,14 +1,14 @@
# Copy this to config.py
MAPPING = {
- "username": (
- "email@example.tld",
- "path-to-config.json",
- 'https://homepage.example.tld/',
- 'Full Name'
+ "runxiyu": (
+ "me@runxiyu.org",
+ "runxiyu.json",
+ 'https://runxiyu.org/',
+ 'Runxi Yu'
),
}
PORT = 5728
MAIL_HOST = "andrewyu.org"
-MAIL_ADDR = "qbox@andrewyu.org"
+MAIL_PREFIX = "qbox"