]> git.lizzy.rs Git - cheatdb.git/blob - app/utils.py
Fix crash on null user agent
[cheatdb.git] / app / utils.py
1 # ContentDB
2 # Copyright (C) 2018  rubenwardy
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17
18 from flask import request, flash, abort, redirect
19 from flask_user import *
20 from flask_login import login_user, logout_user
21 from .models import *
22 from . import app
23 import random, string, os, imghdr, user_agents
24 from urllib.parse import urljoin
25 from werkzeug.datastructures import MultiDict
26
27 # These are given to Jinja in template_filters.py
28
29 def abs_url_for(path, **kwargs):
30         scheme = "https" if app.config["BASE_URL"][:5] == "https" else "http"
31         return url_for(path, _external=True, _scheme=scheme, **kwargs)
32
33 def abs_url(path):
34         return urljoin(app.config["BASE_URL"], path)
35
36 def url_set_query(**kwargs):
37         args = MultiDict(request.args)
38
39         for key, value in kwargs.items():
40                 if key == "_add":
41                         for key2, value_to_add in value.items():
42                                 values = set(args.getlist(key2))
43                                 values.add(value_to_add)
44                                 args.setlist(key2, list(values))
45                 elif key == "_remove":
46                         for key2, value_to_remove in value.items():
47                                 values = set(args.getlist(key2))
48                                 values.discard(value_to_remove)
49                                 args.setlist(key2, list(values))
50                 else:
51                         args.setlist(key, [ value ])
52
53
54         dargs = dict(args.lists())
55
56         return url_for(request.endpoint, **dargs)
57
58 def get_int_or_abort(v, default=None):
59         if v is None:
60                 return default
61
62         try:
63                 return int(v or default)
64         except ValueError:
65                 abort(400)
66
67 def is_user_bot():
68         user_agent = request.headers.get('User-Agent')
69         if user_agent is None:
70                 return True
71
72         user_agent = user_agents.parse(user_agent)
73         return user_agent.is_bot
74
75 def getExtension(filename):
76         return filename.rsplit(".", 1)[1].lower() if "." in filename else None
77
78 def isFilenameAllowed(filename, exts):
79         return getExtension(filename) in exts
80
81 ALLOWED_IMAGES = set(["jpeg", "png"])
82 def isAllowedImage(data):
83         return imghdr.what(None, data) in ALLOWED_IMAGES
84
85 def shouldReturnJson():
86         return "application/json" in request.accept_mimetypes and \
87                         not "text/html" in request.accept_mimetypes
88
89 def randomString(n):
90         return ''.join(random.choice(string.ascii_lowercase + \
91                         string.ascii_uppercase + string.digits) for _ in range(n))
92
93 def doFileUpload(file, fileType, fileTypeDesc):
94         if not file or file is None or file.filename == "":
95                 flash("No selected file", "danger")
96                 return None, None
97
98         assert os.path.isdir(app.config["UPLOAD_DIR"]), "UPLOAD_DIR must exist"
99
100         allowedExtensions = []
101         isImage = False
102         if fileType == "image":
103                 allowedExtensions = ["jpg", "jpeg", "png"]
104                 isImage = True
105         elif fileType == "zip":
106                 allowedExtensions = ["zip"]
107         else:
108                 raise Exception("Invalid fileType")
109
110         ext = getExtension(file.filename)
111         if ext is None or not ext in allowedExtensions:
112                 flash("Please upload " + fileTypeDesc, "danger")
113                 return None, None
114
115         if isImage and not isAllowedImage(file.stream.read()):
116                 flash("Uploaded image isn't actually an image", "danger")
117                 return None, None
118
119         file.stream.seek(0)
120
121         filename = randomString(10) + "." + ext
122         filepath = os.path.join(app.config["UPLOAD_DIR"], filename)
123         file.save(filepath)
124         return "/uploads/" + filename, filepath
125
126 def make_flask_user_password(plaintext_str):
127         # http://passlib.readthedocs.io/en/stable/modular_crypt_format.html
128         # http://passlib.readthedocs.io/en/stable/lib/passlib.hash.bcrypt.html#format-algorithm
129         # Flask_User stores passwords in the Modular Crypt Format.
130         # https://github.com/lingthio/Flask-User/blob/master/flask_user/user_manager__settings.py#L166
131         #   Note that Flask_User allows customizing password algorithms.
132         #   USER_PASSLIB_CRYPTCONTEXT_SCHEMES defaults to bcrypt but if
133         #   default changes or is customized, the code below needs adapting.
134         # Individual password values will look like:
135         #   $2b$12$.az4S999Ztvy/wa3UdQvMOpcki1Qn6VYPXmEFMIdWQyYs7ULnH.JW
136         #   $XX$RR$SSSSSSSSSSSSSSSSSSSSSSHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH
137         # $XX : Selects algorithm (2b is bcrypt).
138         # $RR : Selects bcrypt key expansion rounds (12 is 2**12 rounds).
139         # $SSS... : 22 chars of (random, per-password) salt
140         #  HHH... : 31 remaining chars of password hash (note no dollar sign)
141         import bcrypt
142         plaintext = plaintext_str.encode("UTF-8")
143         password = bcrypt.hashpw(plaintext, bcrypt.gensalt())
144         if isinstance(password, str):
145                 return password
146         else:
147                 return password.decode("UTF-8")
148
149 def loginUser(user):
150         def _call_or_get(v):
151                 if callable(v):
152                         return v()
153                 else:
154                         return v
155
156         # User must have been authenticated
157         if not user:
158                 return False
159
160         if user.rank == UserRank.BANNED:
161                 flash("You have been banned.", "danger")
162                 return False
163
164         user.active = True
165         if not user.rank.atLeast(UserRank.NEW_MEMBER):
166                 user.rank = UserRank.MEMBER
167
168         db.session.commit()
169
170         # Check if user account has been disabled
171         if not _call_or_get(user.is_active):
172                 flash("Your account has not been enabled.", "danger")
173                 return False
174
175         # Use Flask-Login to sign in user
176         login_user(user, remember=True)
177         signals.user_logged_in.send(current_app._get_current_object(), user=user)
178
179         flash("You have signed in successfully.", "success")
180
181         return True
182
183
184 def rank_required(rank):
185         def decorator(f):
186                 @wraps(f)
187                 def decorated_function(*args, **kwargs):
188                         if not current_user.is_authenticated:
189                                 return redirect(url_for("user.login"))
190                         if not current_user.rank.atLeast(rank):
191                                 abort(403)
192
193                         return f(*args, **kwargs)
194
195                 return decorated_function
196         return decorator
197
198 def getPackageByInfo(author, name):
199         user = User.query.filter_by(username=author).first()
200         if user is None:
201                 return None
202
203         package = Package.query.filter_by(name=name, author_id=user.id, soft_deleted=False).first()
204         if package is None:
205                 return None
206
207         return package
208
209 def is_package_page(f):
210         @wraps(f)
211         def decorated_function(*args, **kwargs):
212                 if not ("author" in kwargs and "name" in kwargs):
213                         abort(400)
214
215                 author = kwargs["author"]
216                 name = kwargs["name"]
217
218                 package = getPackageByInfo(author, name)
219                 if package is None:
220                         package = getPackageByInfo(author, name + "_game")
221                         if package is None or package.type != PackageType.GAME:
222                                 abort(404)
223
224                         args = dict(kwargs)
225                         args["name"] = name + "_game"
226                         return redirect(url_for(request.endpoint, **args))
227
228                 del kwargs["author"]
229                 del kwargs["name"]
230
231                 return f(package=package, *args, **kwargs)
232
233         return decorated_function
234
235
236 def addNotification(target, causer, title, url, package=None):
237         try:
238                 iter(target)
239                 for x in target:
240                         addNotification(x, causer, title, url, package)
241                 return
242         except TypeError:
243                 pass
244
245         if target.rank.atLeast(UserRank.NEW_MEMBER) and target != causer:
246                 Notification.query.filter_by(user=target, causer=causer, title=title, url=url, package=package).delete()
247                 notif = Notification(target, causer, title, url, package)
248                 db.session.add(notif)
249
250
251 def addAuditLog(severity, causer, title, url, package=None, description=None):
252         entry = AuditLogEntry(causer, severity, title, url, package, description)
253         db.session.add(entry)
254
255
256 def clearNotifications(url):
257         if current_user.is_authenticated:
258                 Notification.query.filter_by(user=current_user, url=url).delete()
259                 db.session.commit()
260
261
262 YESES = ["yes", "true", "1", "on"]
263
264 def isYes(val):
265         return val and val.lower() in YESES
266
267
268 def isNo(val):
269         return val and not isYes(val)
270
271 def nonEmptyOrNone(str):
272         if str is None or str == "":
273                 return None
274
275         return str