from flask import Flask, redirect, url_for, render_template, render_template_string, request, send_file, send_from_directory, session import yaml, cv2 from PIL import Image from datetime import date, datetime import time from pprint import pprint from random import choice from os.path import exists from os import remove import re import marklite from hashlib import sha256 import hwip from io import BytesIO from base64 import b64encode from nceicon import open_cei app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024 app.secret_key = 'Glyphoglossus Molossus' app.config['SESSION_TYPE'] = 'filesystem' ### funs to encode cei files for pyodide def pad(to_pad, n): while len(to_pad)") def serve_img_noko(text): img = hwip.sign(text) img_io = BytesIO() img.save(img_io, "PNG") img_io.seek(0) dataurl = '' return dataurl @app.route("/ip/") def get_ip(): return process_ip(request) def process_ip(request): if request.environ.get('HTTP_X_FORWARDED_FOR') is None: addr = request.environ['REMOTE_ADDR'] else: addr = request.environ['HTTP_X_FORWARDED_FOR'] aaddr = addr.split(",")[0] try: obj = bytes([int(i) for i in aaddr.split(".")]) sha = sha256(obj).hexdigest() except Exception: obj = bytes(aaddr, "utf-8") sha = sha256(obj).hexdigest() return sha @app.route("/favicon.ico/") def favicon(): return render_template_string("""""") @app.route("/") def io_manager_index(): data = load("templates/index.html") prev = "" post_list = [] if exists("preview.yml"): with open("preview.yml") as f: post_list = yaml.safe_load(f.read()) for i in range(len(post_list)): post_list[i]["text"] = post_list[i].get("text") if len(post_list[i].get("text"))<81 else post_list[i].get("text")[:80-3]+"..." return render_template_string(data, preview=post_list) @app.route("/getcei/") def getcei(fname): path = f"cei/{fname}" if exists(path): return send_file(path, as_attachment=True) else: return "Error: Invalid Filename" @app.route("/liteshare/") def liteshare(fname): path = f"liteshare/{fname}" if exists(path): return send_file(path, as_attachment=True) else: return "Error: Invalid Filename" @app.route("/admin/") def admin_actions(input): series = input.split("-") if series[0]=="setadmin": password = series[1] if password==load("admin.txt"): with open("admin.yml", "w") as f: f.write(yaml.dump({"admin": process_ip(request)})) elif password==load("typh.txt"): d = yaml.safe_load(load("admin.yml")) d["typh"] = process_ip(request) with open("admin.yml", "w") as f: f.write(yaml.dump(d)) elif series[0]=="ban": d = yaml.safe_load(load("admin.yml")) hurts_when_ip = [d[key] for key in list(d.keys())] if process_ip(request) in hurts_when_ip: command, thread, post = series thread, post = [int(i) for i in [thread, post]] my_list = yaml.safe_load(load(f"{thread}.tp"))["posts"]+yaml.safe_load(load(f"{thread}.tp"))["pinned"] post_data = [i for i in my_list if i["id"]==post][0] expires = int(time.time()) + 60*60*24*3 if not exists("banned.yml"): with open("banned.yml", "w") as f: f.write(yaml.dump({"test": 0})) blacklist = yaml.safe_load(load("banned.yml")) blacklist[post_data["alias"]] = expires with open("banned.yml", "w") as f: f.write(yaml.dump(blacklist)) else: return redirect("/post/ran-6-1") elif series[0]=="del": tpdict = yaml.safe_load(load(f"{series[1]}.tp")) tplist = tpdict["posts"] postdict = [p for p in tplist if p["id"]==int(series[2])][0] d = yaml.safe_load(load("admin.yml")) hurts_when_ip = [d[key] for key in list(d.keys())] + [postdict["alias"]] #hurts_when_ip = [yaml.safe_load(load("admin.yml"))["admin"],postdict["alias"]] if not process_ip(request) in hurts_when_ip: return redirect("/post/ran-6-1") else: tpdict["posts"] = [p for p in tplist if p["id"]!=int(series[2])] with open(f"{series[1]}.tp", "w") as f: f.write(yaml.dump(tpdict)) if postdict["media"]: for key in ["media","preview","thumbnail"]: try: remove(postdict[key]) except Exception: pass return redirect("/post/ran-5-1") @app.route("/post/") def post_info(info): board, action, link = info.split("-",2) additional = "

" if action=="4" else "" actions = {"1": "Posted Successfully", "2": "Incorrect Captcha", "3": "Unsupported File Format", "4": "You're banned", "5": "Admin action issued", "6": "Error Issuing the Admin Command"} action = actions[action] post = load("templates/post.html") return render_template_string(post.replace("{{additional}}", additional), action=action, link="/"+board+"/"+link), {"Refresh": "3; url="+"/"+board+"/"+link} @app.route("/cei/") def load_cei(file_path): file_path = f"cei/{file_path}" cei = bytes_to_string(open(file_path,"rb").read())#str(list(open(file_path,"rb").read())) template = """

Loading modules...

Download Get the CEI Viewer """ return render_template_string(template) @app.route("//", methods=['post', 'get']) def io_manager_board(p1): params = p1+"/" r = Request(token = process_ip(request), captcha = request.form.get('security_id'), content = request.form.get('content'), file = request.files.get('my_file'), type = request.method, params = params) return r.process() @app.route("//", methods=['post', 'get']) def io_manager_thread(p1, p2): params = p1+"/"+p2 r = Request(token = process_ip(request), captcha = request.form.get('security_id'), content = request.form.get('content'), file = request.files.get('my_file'), type = request.method, params = params) return r.process() class Request: def __init__(self, token, captcha, content, file, type, params): self.token = token self.captcha = captcha self.content = content self.file = file self.type = type self.params = params pdict = parse_params(self.params) self.pdict = pdict if pdict["type"]=="frontpage": self.page = "" elif pdict["type"]=="board": self.page = Thread(self) elif pdict["type"]=="catalog": self.page = Catalog(self) elif pdict["type"]=="thread": self.page = Thread(self) elif pdict["type"]=="post": self.page = Post(self) def process(self): if self.type=="POST": return self.make_post_new() return render_template_string(self.page.render()) def make_post(self): if self.pdict["type"]=="board": fname = f"{get_post_number()}.tp" me, pr, th = process_media(self.file) increase_post_number() d = {"alias": self.token, "id": get_post_number(), "name": "Anonymous", "thumbnail": th, "preview": pr, "media": me, "text": self.content, "date": str(date.today().strftime("%d/%m/%Y"))+" "+datetime.now().strftime("%H:%M:%S")} bfile = self.pdict["board"]+".b" with open(bfile, "r") as f: thread_list = yaml.safe_load(f.read()) thread_list["posts"] = [fname]+thread_list["posts"] if thread_list==None: thread_list = [] with open(bfile, "w") as f: f.write(yaml.dump(thread_list)) with open(fname, "w") as f: f.write(yaml.dump([d])) return_board = self.pdict["board"] return_thread = "" elif self.pdict["type"]=="thread": fname = f"{self.pdict['index']}.tp" me, pr, th = process_media(self.file) increase_post_number() d = {"alias": self.token, "id": get_post_number(), "name": "Anonymous", "thumbnail": th, "preview": pr, "media": me, "text": self.content, "date": str(date.today().strftime("%d/%m/%Y"))+" "+datetime.now().strftime("%H:%M:%S")} print("fname:",fname) with open(fname, "r") as f: tp = yaml.safe_load(f.read()) tp["posts"]+=[d] with open(fname, "w") as f: f.write(yaml.dump(tp)) return_board = self.pdict["board"] return_thread = self.pdict["thread"] return redirect("/post/"+f"{return_board}-1-{return_thread}") def make_post_new(self): if exists("banned.yml"): banned = yaml.safe_load(load("banned.yml")) if self.token in list(banned.keys()): if time.time() < banned[self.token]: return_board = self.pdict["board"] return_thread = self.pdict["thread"] return redirect("/post/"+f"{return_board}-4-{return_thread}") else: banned.pop(self.token) with open("banned.yml", "w") as f: f.write(yaml.dump(banned)) if self.pdict["type"] == "board": if not "legacy": fname = f"{get_post_number()}.tp" me, pr, th = process_media(self.file) increase_post_number() d = {"alias": self.token, "id": get_post_number(), "name": "Anonymous", "thumbnail": th, "preview": pr, "media": me, "text": self.content, "date": str(date.today().strftime("%d/%m/%Y"))+" "+datetime.now().strftime("%H:%M:%S"), "board": self.pdict["board"] } bfile = self.pdict["board"]+".b" with open(bfile, "r") as f: thread_list = yaml.safe_load(f.read()) thread_list["posts"] = [fname]+thread_list["posts"] if thread_list==None: thread_list = [] with open(bfile, "w") as f: f.write(yaml.dump(thread_list)) with open(fname, "w") as f: f.write(yaml.dump([d])) return_board = self.pdict["board"] return_thread = "" else: fname = f"{get_post_number()}.tp" me, pr, th = process_media(self.file) increase_post_number() d = {"alias": self.token, "id": get_post_number(), "name": "Anonymous", "thumbnail": th, "preview": pr, "media": me, "text": self.content, "date": str(date.today().strftime("%d/%m/%Y"))+" "+datetime.now().strftime("%H:%M:%S"), "board": self.pdict["board"] } bfile = self.pdict["board"]+".b" with open(bfile, "r") as f: thread_dict = yaml.safe_load(f.read()) if thread_dict==None: thread_dict = {"pinned": [], "posts": []} thread_dict["posts"] = [fname]+thread_dict["posts"] with open(bfile, "w") as f: f.write(yaml.dump(thread_dict)) with open(fname, "w") as f: f.write(yaml.dump({"pinned": [d],"posts": []})) return_board = self.pdict["board"] return_thread = "" elif self.pdict["type"]=="thread": fname = f"{self.pdict['index']}.tp" self.bump(f"{self.pdict['board']}.b",fname) me, pr, th = process_media(self.file) increase_post_number() d = {"alias": self.token, "id": get_post_number(), "name": "Anonymous", "thumbnail": th, "preview": pr, "media": me, "text": self.content, "date": str(date.today().strftime("%d/%m/%Y"))+" "+datetime.now().strftime("%H:%M:%S"), "board": self.pdict["board"], "thread": self.pdict["index"] } with open(fname, "r") as f: tp = yaml.safe_load(f.read()) tp["posts"]+=[d] with open(fname, "w") as f: f.write(yaml.dump(tp)) return_board = self.pdict["board"] return_thread = self.pdict["thread"] if not "nsfw" in self.pdict["board"]:#preview in main page modification if exists("preview.yml"): base_list = yaml.safe_load(open("preview.yml").read()) else: base_list = [] base_list.append(d) if len(base_list)>3: base_list.pop(0) with open("preview.yml", "w") as f: f.write(yaml.dump(base_list)) return redirect("/post/"+f"{return_board}-1-{return_thread}") def bump(self, bname, tname): board_dict = yaml.safe_load(load(bname)) if tname in board_dict["pinned"]: board_dict["pinned"].pop(board_dict["pinned"].index(tname)) board_dict["pinned"] = [tname] + board_dict["pinned"] elif tname in board_dict["posts"]: board_dict["posts"].pop(board_dict["posts"].index(tname)) board_dict["posts"] = [tname] + board_dict["posts"] with open(bname, "w") as f: f.write(yaml.dump(board_dict)) class Catalog: def __init__(self, master): self.master = master def render(self): with open("templates/catalog.html") as f: base = f.read() row = " {} {} {} {} " if self.master.pdict["type"]=="catalog": post_info_names = get_posts_from_yaml(f"{self.master.pdict['board']}.b") post_info = [merge(get_posts_from_yaml(post_info_name)[0] , {"thread": post_info_name.split(".")[0]}) for post_info_name in post_info_names] rows = [] while post_info: four = [] for _ in range(4): if post_info: four.append(Post(self, post_info.pop(0)).render_for_catalog()) else: four.append("") rows.append(row.format(*four)) return render_template_string( load("templates/anonymous6.html").replace("{{board}}",self.get_catalog()).replace("{{tables}}",render_template_string(base.replace("{{TABLE_ROWS}}","\n".join(rows)))), dt = datetime.now().strftime('%d/%m/%Y %H:%M:%S')+random_arquee(), board_title = yaml.safe_load(load(f"{self.master.pdict['board']}.b")).get("title")) def get_catalog(self): html = f""" /{self.master.pdict["board"]}/ """ return html class Thread: def __init__(self, master): self.master = master def render(self): with open("templates/anonymous6.html") as f: base = f.read().replace("{{board}}",self.get_catalog()) if self.master.pdict["type"]=="board": post_info_names = get_posts_from_yaml(f"{self.master.pdict['board']}.b")#yaml.safe_load(load(f"{self.master.pdict['board']}.b")) post_info = [merge(get_posts_from_yaml(post_info_name)[0] , {"thread": post_info_name.split(".")[0]}) for post_info_name in post_info_names] elif self.master.pdict["type"]=="thread": post_info = [merge(pi , {"thread": f"{self.master.pdict['thread']}"}) for pi in get_posts_from_yaml(f"{self.master.pdict['thread']}.tp")] return render_template_string(base.replace("{{tables}}","\n".join([Post(self, pi).render() for pi in post_info]+self.get_login())), dt = datetime.now().strftime('%d/%m/%Y %H:%M:%S')+random_arquee(), board_title = yaml.safe_load(load(f"{self.master.pdict['board']}.b")).get("title")) def get_login(self): login = """

{% block captcha %}{% if """+str(captcha_enabled())+""" %}{% endif %}"""+"""{% endblock captcha %}

"""+"""{% block log %}{% if """+str(captcha_enabled())+""" %}{% endif %}{% endblock log %}"""+"""

{div}

""".format(ta = """onKeyDown="textCounter(this,'progressbar1',2000)" onKeyUp="textCounter(this,'progressbar1',2000)" onFocus="textCounter(this,'progressbar1',2000)""", div = """
""") return [login] def get_catalog(self): html = f""" /{self.master.pdict["board"]}/ """ return html class Post: def __init__(self, master, info): #print("Info:", info["thread"]) self.info = info self.master = master def render(self): text = self.info["text"] text = emojis(text) dots = ''''''.format(self.info["thread"],self.info["id"],"/"+self.info["media"],self.info["thread"],self.info["id"],self.info["id"])#board if 0: regular = """

{text}

"""#{thread_link} #7C7C7C green = """

>{text}

""" base_link = """{text}""" title_form = """

{title}

""" processed_text = "" focus = 0 while focus\n": if text[focus]=="#": start = focus+1 end = start while end": end = focus+1 while end": processed_text+=text[focus] focus+=1 else: processed_text+=text[focus] focus+=1 else: processed_text = marklite.render(text, {"board":self.master.master.pdict['board'],"thread":self.info['thread']}) self.is_video = False########## #processed_text = emojis(processed_text) self.table_info = {"div_id": self.info["id"], "x": 200, "y": 200, "image": self.get_popup(), "headercolor": "#ecddbe", "bgcolor": "#fff", "dots": dots, "name": self.info["name"], "date": time_label(self.info["date"]), "number": self.get_number(), "content": processed_text} table = self.get_table() return table def get_number(self): inside = "#"+str(self.info["id"]) base = """{text}""" return base.format(thread_link = f"/{self.master.master.pdict['board']}/{self.info['thread']}", text = inside) def render_for_catalog(self): regular = """{text}"""#{thread_link} #7C7C7C green = """

>{text}

""" base_link = """{text}""" title_form = """

{title}

""" base = load("templates/catalog_post.html") text = self.info["text"]# if len(self.info["text"])<25 else self.info["text"][:25] self.info["board"] = self.master.master.pdict["board"] #text = emojis(text) processed_text = "" focus = 0 while focus\n": if text[focus]=="#": start = focus+1 end = start+1 while end
{dots}{name}
{date}{number}
{image}

{content}

""".format(**self.table_info) return table def get_popup(self): print(self.info["media"]) is_video = (self.info["media"].lower().endswith(".mp4") or self.info["media"].lower().endswith(".webm")) if not is_video: template = load("templates/popup_universal.html") print("image") else: print("video") template = load("templates/popup_universal.html") if self.info["media"]: if not is_video: return render_template_string(template, hash_id = self.info["id"], thumbnail = "/"+self.info["thumbnail"], preview = "/"+self.info["preview"], fname = self.info["media"], original_link = "/"+self.info["media"]) else: return render_template_string(template, hash_id = self.info["id"], thumbnail = "/"+self.info["thumbnail"], preview = "/"+self.info["media"], fname = self.info["media"], original_link = "/"+self.info["media"]) else: return "" def get_posts_from_yaml(fname): try: data = yaml.safe_load(load(fname)) if isinstance(data, dict): if data["pinned"]: return data["pinned"]+data["posts"] else: return data["posts"] elif isinstance(data, list): return data else: raise TypeError("") except Exception as e: return [] def random_arquee(): with open("marquee.data") as f: data = yaml.safe_load(f.read()) return " «"+render_template_string(choice(data))+"»" def time_label(data): html = """ {short} {full_}""".replace("{short}",data.split(" ")[0]).replace("{full_}", data)#.format(short=data.split(" ")[0], full=data) return html def serve_captcha(filename): return "" def captcha_enabled(): return False def get_popup_legacy(pngs): one, two, three = [load("templates/"+i) for i in ["pop1.html","pop4.html","pop3.html"]] images = [] for png in pngs: tag = "video" if ".mp4" in png else "img" if ".png" in png: images.append(render_template_string(two, item_hash = str(abs(hash(png))), file_jpg = png.replace(".png",".jpg"), uri=serve_pil_image("static/"+png), png=png, full="/static/"+png.split(".")[0]+".png", tag=tag)) else: images.append(render_template_string(two, item_hash = str(abs(hash(png))), file_jpg = png.replace(".png",".jpg"), uri=serve_pil_image("static/"+png), png=png, full="/static/"+png.split(".")[0]+".png", tag=tag)) return one, images[0], three#"\n".join([one,*images,three]) def get_popup_video_legacy(pngs): one, two, three = [load("templates/"+i) for i in ["pop1.html","pop5.html","pop3.html"]] images = [] for png in pngs: tag = "video" if ".mp4" in png else "img" if ".mp4" in png or 1: images.append(render_template_string(two, item_hash = str(abs(hash(png))), file_jpg = png.replace(".mp4",".jpg"), uri=serve_pil_image("static/"+png), png=png, full="/static/"+png.split(".")[0]+".mp4", tag = tag)) else: images.append(render_template_string(two, item_hash = str(abs(hash(png))), file_jpg = png.replace(".mp4",".jpg"))) return one, images[0], three#"\n".join([one,*images,three]) def load(filename): with open(filename) as f: output = f.read() return output def emojis(text): sequence = text.split(":") if len(sequence)<3: return text for n in range(1,len(sequence),2): if set(sequence[n]).issubset(set("_qwertyuiopasdfghjklzxcvbnm0123456789")): sequence[n] = '''''' else: sequence[n] = ":"+sequence[n]+":" return "".join(sequence) def emojis2(text): pattern_list = re.findall("(?<=:)[a-zA-Z]+(?=:)", text) for pat in pattern_list: print(pat) if exists(f"static/emoji.{pat}.png"): emoji = '''''' text = text.replace(f":{pat}:", emoji, 1) else: pass return text def parse_params(params): plist = params.split("/") d = dict() if len(plist)==1 and not plist[0]:#frontpage d["type"] = "frontpage" elif len(plist)==1:#board d["type"] = "board" d["board"] = plist[0] elif len(plist)==2:#thread or catalog if plist[-1].isnumeric(): d["type"] = "thread" d["thread"] = plist[1] d["index"] = plist[-1] elif plist[-1]=="catalog": d["type"] = "catalog" else: d["type"] = "board" d["board"] = plist[0] else:#post d["thread"] = plist[1] d["post"] = plist[2] d["type"] = "post" print(d) return d def get_post_number(): with open("data/post_number.data") as f: n = int(f.read()) return n def increase_post_number(): n = get_post_number()+1 with open("data/post_number.data", "w") as f: f.write(str(n)) def to_pil(imgOpenCV): return Image.fromarray(cv2.cvtColor(imgOpenCV, cv2.COLOR_BGR2RGB)) def video_thumbnail(fname): vidcap = cv2.VideoCapture(fname) video_length = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) success,image = vidcap.read() count = 0 while success and count<0.25*video_length: success, image = vidcap.read() count += 1 pimg = to_pil(image) pimg.thumbnail((200,200)) return pimg def process_media(file): fname = "static/"+file.filename ext = fname.split(".")[-1].lower() supported_images = ["png", "jpg", "jpeg", "jpp", "gif", "webp", "apng", "tiff"] supported_videos = ["mp4", "webm"] supported_formats = supported_images + supported_videos + ["cei"] if not ext in supported_formats: return "", "", "" else: if ext in supported_images: file.save(fname) original = Image.open(fname) preview = fname+".preview.jpg" original.convert("RGB").save(preview, quality = 90, optimize = True, progressive = True) thumbnail = fname+".thumbnail.jpg" th = original.copy().convert("RGB") th.thumbnail((200,200)) th.save(thumbnail, quality = 75, optimize = True, progressive = True) elif ext=="cei": fname = "cei/"+file.filename file.save(fname) original = open_cei(fname, True)[0] preview = "static/"+file.filename+".preview.jpg" original.convert("RGB").save(preview, quality = 90, optimize = True, progressive = True) thumbnail = "static/"+file.filename+".thumbnail.jpg" th = original.copy().convert("RGB") th.thumbnail((200,200)) th.save(thumbnail, quality = 75, optimize = True, progressive = True) elif ext in supported_videos: file.save(fname) preview = fname th = video_thumbnail(fname) thumbnail = fname+".thumbnail.jpg" th.save(thumbnail, quality = 75, optimize = True, progressive = True) return fname, preview, thumbnail def ip_name(ip): originals = list(".1234567890") modified = list("qwertyuiopa") name = "" for character in ip: name += modified[originals.index(character)] return str(abs(hash(name))) def merge(dict1, dict2): dict3 = dict1.copy() dict3.update(dict2) return dict3 def tag_correction(html): htmlist = html.split("") for n in range(len(htmlist)): element = htmlist[n] important = element.split("")[0] if "","",1) htmlist[n] = element elif "".join(htmlist) """ - alias: 8e3qe31u4up91451 date: 08/07/2022 10:44:44 id: '914' image: 4730.png name: Anonymous text: <3 """ if __name__ == "__main__": app.run(threaded=True)#http://127.0.0.1:5000/ran/1