-# Content DB
+# ContentDB
# Copyright (C) 2018 rubenwardy
#
# This program is free software: you can redistribute it and/or modify
from flask import request, flash, abort, redirect
from flask_user import *
from flask_login import login_user, logout_user
-from app.models import *
-from app import app
-import random, string, os
+from .models import *
+from . import app
+import random, string, os, imghdr, user_agents
+from urllib.parse import urljoin
+from werkzeug.datastructures import MultiDict
+
+# These are given to Jinja in template_filters.py
+
+def abs_url_for(path, **kwargs):
+ scheme = "https" if app.config["BASE_URL"][:5] == "https" else "http"
+ return url_for(path, _external=True, _scheme=scheme, **kwargs)
+
+def abs_url(path):
+ return urljoin(app.config["BASE_URL"], path)
+
+def url_set_query(**kwargs):
+ args = MultiDict(request.args)
+
+ for key, value in kwargs.items():
+ if key == "_add":
+ for key2, value_to_add in value.items():
+ values = set(args.getlist(key2))
+ values.add(value_to_add)
+ args.setlist(key2, list(values))
+ elif key == "_remove":
+ for key2, value_to_remove in value.items():
+ values = set(args.getlist(key2))
+ values.discard(value_to_remove)
+ args.setlist(key2, list(values))
+ else:
+ args.setlist(key, [ value ])
+
+
+ dargs = dict(args.lists())
+
+ return url_for(request.endpoint, **dargs)
+
+def get_int_or_abort(v, default=None):
+ if v is None:
+ return default
+
+ try:
+ return int(v or default)
+ except ValueError:
+ abort(400)
+
+def is_user_bot():
+ user_agent = request.headers.get('User-Agent')
+ if user_agent is None:
+ return True
+
+ user_agent = user_agents.parse(user_agent)
+ return user_agent.is_bot
def getExtension(filename):
return filename.rsplit(".", 1)[1].lower() if "." in filename else None
def isFilenameAllowed(filename, exts):
return getExtension(filename) in exts
+ALLOWED_IMAGES = set(["jpeg", "png"])
+def isAllowedImage(data):
+ return imghdr.what(None, data) in ALLOWED_IMAGES
+
def shouldReturnJson():
return "application/json" in request.accept_mimetypes and \
not "text/html" in request.accept_mimetypes
return ''.join(random.choice(string.ascii_lowercase + \
string.ascii_uppercase + string.digits) for _ in range(n))
-def doFileUpload(file, allowedExtensions, fileTypeName):
+def doFileUpload(file, fileType, fileTypeDesc):
if not file or file is None or file.filename == "":
- flash("No selected file", "error")
- return None
+ flash("No selected file", "danger")
+ return None, None
+
+ assert os.path.isdir(app.config["UPLOAD_DIR"]), "UPLOAD_DIR must exist"
+
+ allowedExtensions = []
+ isImage = False
+ if fileType == "image":
+ allowedExtensions = ["jpg", "jpeg", "png"]
+ isImage = True
+ elif fileType == "zip":
+ allowedExtensions = ["zip"]
+ else:
+ raise Exception("Invalid fileType")
ext = getExtension(file.filename)
if ext is None or not ext in allowedExtensions:
- flash("Please upload load " + fileTypeName, "error")
- return None
+ flash("Please upload " + fileTypeDesc, "danger")
+ return None, None
+
+ if isImage and not isAllowedImage(file.stream.read()):
+ flash("Uploaded image isn't actually an image", "danger")
+ return None, None
+
+ file.stream.seek(0)
filename = randomString(10) + "." + ext
- file.save(os.path.join("app/public/uploads", filename))
- return "/uploads/" + filename
+ filepath = os.path.join(app.config["UPLOAD_DIR"], filename)
+ file.save(filepath)
+ return "/uploads/" + filename, filepath
def make_flask_user_password(plaintext_str):
# http://passlib.readthedocs.io/en/stable/modular_crypt_format.html
else:
return password.decode("UTF-8")
-def _do_login_user(user, remember_me=False):
+def loginUser(user):
def _call_or_get(v):
if callable(v):
return v()
return False
if user.rank == UserRank.BANNED:
- flash("You have been banned.", "error")
+ flash("You have been banned.", "danger")
return False
user.active = True
# Check if user account has been disabled
if not _call_or_get(user.is_active):
- flash("Your account has not been enabled.", "error")
- return False
-
- # Check if user has a confirmed email address
- user_manager = current_app.user_manager
- if user_manager.enable_email and user_manager.enable_confirm_email \
- and not current_app.user_manager.enable_login_without_confirm_email \
- and not user.has_confirmed_email():
- url = url_for("user.resend_confirm_email")
- flash("Your email address has not yet been confirmed", "error")
+ flash("Your account has not been enabled.", "danger")
return False
# Use Flask-Login to sign in user
- login_user(user, remember=remember_me)
+ login_user(user, remember=True)
signals.user_logged_in.send(current_app._get_current_object(), user=user)
flash("You have signed in successfully.", "success")
return True
-def loginUser(user):
- user_mixin = None
- if user_manager.enable_username:
- user_mixin = user_manager.find_user_by_username(user.username)
-
- return _do_login_user(user_mixin, True)
def rank_required(rank):
def decorator(f):
def getPackageByInfo(author, name):
user = User.query.filter_by(username=author).first()
if user is None:
- abort(404)
+ return None
package = Package.query.filter_by(name=name, author_id=user.id, soft_deleted=False).first()
if package is None:
- abort(404)
+ return None
return package
if not ("author" in kwargs and "name" in kwargs):
abort(400)
- package = getPackageByInfo(kwargs["author"], kwargs["name"])
+ author = kwargs["author"]
+ name = kwargs["name"]
+
+ package = getPackageByInfo(author, name)
+ if package is None:
+ package = getPackageByInfo(author, name + "_game")
+ if package is None or package.type != PackageType.GAME:
+ abort(404)
+
+ args = dict(kwargs)
+ args["name"] = name + "_game"
+ return redirect(url_for(request.endpoint, **args))
del kwargs["author"]
del kwargs["name"]
return decorated_function
-def triggerNotif(owner, causer, title, url):
- if owner.rank.atLeast(UserRank.NEW_MEMBER) and owner != causer:
- Notification.query.filter_by(user=owner, url=url).delete()
- notif = Notification(owner, causer, title, url)
+
+def addNotification(target, causer, title, url, package=None):
+ try:
+ iter(target)
+ for x in target:
+ addNotification(x, causer, title, url, package)
+ return
+ except TypeError:
+ pass
+
+ if target.rank.atLeast(UserRank.NEW_MEMBER) and target != causer:
+ Notification.query.filter_by(user=target, causer=causer, title=title, url=url, package=package).delete()
+ notif = Notification(target, causer, title, url, package)
db.session.add(notif)
+
+def addAuditLog(severity, causer, title, url, package=None, description=None):
+ entry = AuditLogEntry(causer, severity, title, url, package, description)
+ db.session.add(entry)
+
+
def clearNotifications(url):
if current_user.is_authenticated:
Notification.query.filter_by(user=current_user, url=url).delete()
def isNo(val):
return val and not isYes(val)
+
+def nonEmptyOrNone(str):
+ if str is None or str == "":
+ return None
+
+ return str