X-Git-Url: https://git.lizzy.rs/?a=blobdiff_plain;f=app%2Futils.py;h=fb698856698dcd19123d657503eead379eaaa660;hb=92fb54556ad0409a519c308e7e34b63e11621903;hp=fd36392664e054a7cb89229ff406a04e30372394;hpb=18b9fb38766260aabeab5ce1cd98c7f49d57c91d;p=cheatdb.git diff --git a/app/utils.py b/app/utils.py index fd36392..fb69885 100644 --- a/app/utils.py +++ b/app/utils.py @@ -1,4 +1,4 @@ -# Content DB +# ContentDB # Copyright (C) 2018 rubenwardy # # This program is free software: you can redistribute it and/or modify @@ -18,9 +18,59 @@ from flask import request, flash, abort, redirect from flask_user import * from flask_login import login_user, logout_user -from app.models import * -from app import app -import random, string, os, imghdr +from .models import * +from . import app +import random, string, os, imghdr, user_agents +from urllib.parse import urljoin +from werkzeug.datastructures import MultiDict + +# These are given to Jinja in template_filters.py + +def abs_url_for(path, **kwargs): + scheme = "https" if app.config["BASE_URL"][:5] == "https" else "http" + return url_for(path, _external=True, _scheme=scheme, **kwargs) + +def abs_url(path): + return urljoin(app.config["BASE_URL"], path) + +def url_set_query(**kwargs): + args = MultiDict(request.args) + + for key, value in kwargs.items(): + if key == "_add": + for key2, value_to_add in value.items(): + values = set(args.getlist(key2)) + values.add(value_to_add) + args.setlist(key2, list(values)) + elif key == "_remove": + for key2, value_to_remove in value.items(): + values = set(args.getlist(key2)) + values.discard(value_to_remove) + args.setlist(key2, list(values)) + else: + args.setlist(key, [ value ]) + + + dargs = dict(args.lists()) + + return url_for(request.endpoint, **dargs) + +def get_int_or_abort(v, default=None): + if v is None: + return default + + try: + return int(v or default) + except ValueError: + abort(400) + +def is_user_bot(): + user_agent = request.headers.get('User-Agent') + if user_agent is None: + return True + + user_agent = user_agents.parse(user_agent) + return user_agent.is_bot def getExtension(filename): return filename.rsplit(".", 1)[1].lower() if "." in filename else None @@ -42,8 +92,10 @@ def randomString(n): def doFileUpload(file, fileType, fileTypeDesc): if not file or file is None or file.filename == "": - flash("No selected file", "error") - return None + flash("No selected file", "danger") + return None, None + + assert os.path.isdir(app.config["UPLOAD_DIR"]), "UPLOAD_DIR must exist" allowedExtensions = [] isImage = False @@ -57,18 +109,19 @@ def doFileUpload(file, fileType, fileTypeDesc): ext = getExtension(file.filename) if ext is None or not ext in allowedExtensions: - flash("Please upload load " + fileTypeDesc, "danger") - return None + flash("Please upload " + fileTypeDesc, "danger") + return None, None if isImage and not isAllowedImage(file.stream.read()): flash("Uploaded image isn't actually an image", "danger") - return None + return None, None file.stream.seek(0) filename = randomString(10) + "." + ext - file.save(os.path.join("app/public/uploads", filename)) - return "/uploads/" + filename + filepath = os.path.join(app.config["UPLOAD_DIR"], filename) + file.save(filepath) + return "/uploads/" + filename, filepath def make_flask_user_password(plaintext_str): # http://passlib.readthedocs.io/en/stable/modular_crypt_format.html @@ -93,7 +146,7 @@ def make_flask_user_password(plaintext_str): else: return password.decode("UTF-8") -def _do_login_user(user, remember_me=False): +def loginUser(user): def _call_or_get(v): if callable(v): return v() @@ -105,7 +158,7 @@ def _do_login_user(user, remember_me=False): return False if user.rank == UserRank.BANNED: - flash("You have been banned.", "error") + flash("You have been banned.", "danger") return False user.active = True @@ -116,32 +169,17 @@ def _do_login_user(user, remember_me=False): # Check if user account has been disabled if not _call_or_get(user.is_active): - flash("Your account has not been enabled.", "error") - return False - - # Check if user has a confirmed email address - user_manager = current_app.user_manager - if user_manager.enable_email and user_manager.enable_confirm_email \ - and not current_app.user_manager.enable_login_without_confirm_email \ - and not user.has_confirmed_email(): - url = url_for("user.resend_confirm_email") - flash("Your email address has not yet been confirmed", "error") + flash("Your account has not been enabled.", "danger") return False # Use Flask-Login to sign in user - login_user(user, remember=remember_me) + login_user(user, remember=True) signals.user_logged_in.send(current_app._get_current_object(), user=user) flash("You have signed in successfully.", "success") return True -def loginUser(user): - user_mixin = None - if user_manager.enable_username: - user_mixin = user_manager.find_user_by_username(user.username) - - return _do_login_user(user_mixin, True) def rank_required(rank): def decorator(f): @@ -160,11 +198,12 @@ def rank_required(rank): def getPackageByInfo(author, name): user = User.query.filter_by(username=author).first() if user is None: - abort(404) + return None - package = Package.query.filter_by(name=name, author_id=user.id, soft_deleted=False).first() + package = Package.query.filter_by(name=name, author_id=user.id) \ + .filter(Package.state!=PackageState.DELETED).first() if package is None: - abort(404) + return None return package @@ -174,7 +213,18 @@ def is_package_page(f): if not ("author" in kwargs and "name" in kwargs): abort(400) - package = getPackageByInfo(kwargs["author"], kwargs["name"]) + author = kwargs["author"] + name = kwargs["name"] + + package = getPackageByInfo(author, name) + if package is None: + package = getPackageByInfo(author, name + "_game") + if package is None or package.type != PackageType.GAME: + abort(404) + + args = dict(kwargs) + args["name"] = name + "_game" + return redirect(url_for(request.endpoint, **args)) del kwargs["author"] del kwargs["name"] @@ -183,12 +233,27 @@ def is_package_page(f): return decorated_function -def triggerNotif(owner, causer, title, url): - if owner.rank.atLeast(UserRank.NEW_MEMBER) and owner != causer: - Notification.query.filter_by(user=owner, url=url).delete() - notif = Notification(owner, causer, title, url) + +def addNotification(target, causer, title, url, package=None): + try: + iter(target) + for x in target: + addNotification(x, causer, title, url, package) + return + except TypeError: + pass + + if target.rank.atLeast(UserRank.NEW_MEMBER) and target != causer: + Notification.query.filter_by(user=target, causer=causer, title=title, url=url, package=package).delete() + notif = Notification(target, causer, title, url, package) db.session.add(notif) + +def addAuditLog(severity, causer, title, url, package=None, description=None): + entry = AuditLogEntry(causer, severity, title, url, package, description) + db.session.add(entry) + + def clearNotifications(url): if current_user.is_authenticated: Notification.query.filter_by(user=current_user, url=url).delete() @@ -203,3 +268,9 @@ def isYes(val): def isNo(val): return val and not isYes(val) + +def nonEmptyOrNone(str): + if str is None or str == "": + return None + + return str