]> git.lizzy.rs Git - cheatdb.git/blob - app/utils.py
cc338b4b04ee37ea12fc4762477c51cec9df1f1a
[cheatdb.git] / app / utils.py
1 # ContentDB
2 # Copyright (C) 2018  rubenwardy
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17
18 from flask import request, flash, abort, redirect
19 from flask_user import *
20 from flask_login import login_user, logout_user
21 from .models import *
22 from . import app
23 import random, string, os, imghdr
24 from urllib.parse import urljoin
25
26 # These are given to Jinja in template_filters.py
27
28 def abs_url_for(path, **kwargs):
29         scheme = "https" if app.config["BASE_URL"][:5] == "https" else "http"
30         return url_for(path, _external=True, _scheme=scheme, **kwargs)
31
32 def abs_url(path):
33         return urljoin(app.config["BASE_URL"], path)
34
35 def url_set_query(**kwargs):
36         args = dict(request.args)
37         args.update(kwargs)
38         return url_for(request.endpoint, **args)
39
40 def get_int_or_abort(v, default=None):
41         if v is None:
42                 return default
43
44         try:
45                 return int(v or default)
46         except ValueError:
47                 abort(400)
48
49 def getExtension(filename):
50         return filename.rsplit(".", 1)[1].lower() if "." in filename else None
51
52 def isFilenameAllowed(filename, exts):
53         return getExtension(filename) in exts
54
55 ALLOWED_IMAGES = set(["jpeg", "png"])
56 def isAllowedImage(data):
57         return imghdr.what(None, data) in ALLOWED_IMAGES
58
59 def shouldReturnJson():
60         return "application/json" in request.accept_mimetypes and \
61                         not "text/html" in request.accept_mimetypes
62
63 def randomString(n):
64         return ''.join(random.choice(string.ascii_lowercase + \
65                         string.ascii_uppercase + string.digits) for _ in range(n))
66
67 def doFileUpload(file, fileType, fileTypeDesc):
68         if not file or file is None or file.filename == "":
69                 flash("No selected file", "danger")
70                 return None, None
71
72         assert os.path.isdir(app.config["UPLOAD_DIR"]), "UPLOAD_DIR must exist"
73
74         allowedExtensions = []
75         isImage = False
76         if fileType == "image":
77                 allowedExtensions = ["jpg", "jpeg", "png"]
78                 isImage = True
79         elif fileType == "zip":
80                 allowedExtensions = ["zip"]
81         else:
82                 raise Exception("Invalid fileType")
83
84         ext = getExtension(file.filename)
85         if ext is None or not ext in allowedExtensions:
86                 flash("Please upload " + fileTypeDesc, "danger")
87                 return None, None
88
89         if isImage and not isAllowedImage(file.stream.read()):
90                 flash("Uploaded image isn't actually an image", "danger")
91                 return None, None
92
93         file.stream.seek(0)
94
95         filename = randomString(10) + "." + ext
96         filepath = os.path.join(app.config["UPLOAD_DIR"], filename)
97         file.save(filepath)
98         return "/uploads/" + filename, filepath
99
100 def make_flask_user_password(plaintext_str):
101         # http://passlib.readthedocs.io/en/stable/modular_crypt_format.html
102         # http://passlib.readthedocs.io/en/stable/lib/passlib.hash.bcrypt.html#format-algorithm
103         # Flask_User stores passwords in the Modular Crypt Format.
104         # https://github.com/lingthio/Flask-User/blob/master/flask_user/user_manager__settings.py#L166
105         #   Note that Flask_User allows customizing password algorithms.
106         #   USER_PASSLIB_CRYPTCONTEXT_SCHEMES defaults to bcrypt but if
107         #   default changes or is customized, the code below needs adapting.
108         # Individual password values will look like:
109         #   $2b$12$.az4S999Ztvy/wa3UdQvMOpcki1Qn6VYPXmEFMIdWQyYs7ULnH.JW
110         #   $XX$RR$SSSSSSSSSSSSSSSSSSSSSSHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH
111         # $XX : Selects algorithm (2b is bcrypt).
112         # $RR : Selects bcrypt key expansion rounds (12 is 2**12 rounds).
113         # $SSS... : 22 chars of (random, per-password) salt
114         #  HHH... : 31 remaining chars of password hash (note no dollar sign)
115         import bcrypt
116         plaintext = plaintext_str.encode("UTF-8")
117         password = bcrypt.hashpw(plaintext, bcrypt.gensalt())
118         if isinstance(password, str):
119                 return password
120         else:
121                 return password.decode("UTF-8")
122
123 def loginUser(user):
124         def _call_or_get(v):
125                 if callable(v):
126                         return v()
127                 else:
128                         return v
129
130         # User must have been authenticated
131         if not user:
132                 return False
133
134         if user.rank == UserRank.BANNED:
135                 flash("You have been banned.", "danger")
136                 return False
137
138         user.active = True
139         if not user.rank.atLeast(UserRank.NEW_MEMBER):
140                 user.rank = UserRank.MEMBER
141
142         db.session.commit()
143
144         # Check if user account has been disabled
145         if not _call_or_get(user.is_active):
146                 flash("Your account has not been enabled.", "danger")
147                 return False
148
149         # Use Flask-Login to sign in user
150         login_user(user, remember=True)
151         signals.user_logged_in.send(current_app._get_current_object(), user=user)
152
153         flash("You have signed in successfully.", "success")
154
155         return True
156
157
158 def rank_required(rank):
159         def decorator(f):
160                 @wraps(f)
161                 def decorated_function(*args, **kwargs):
162                         if not current_user.is_authenticated:
163                                 return redirect(url_for("user.login"))
164                         if not current_user.rank.atLeast(rank):
165                                 abort(403)
166
167                         return f(*args, **kwargs)
168
169                 return decorated_function
170         return decorator
171
172 def getPackageByInfo(author, name):
173         user = User.query.filter_by(username=author).first()
174         if user is None:
175                 abort(404)
176
177         package = Package.query.filter_by(name=name, author_id=user.id, soft_deleted=False).first()
178         if package is None:
179                 abort(404)
180
181         return package
182
183 def is_package_page(f):
184         @wraps(f)
185         def decorated_function(*args, **kwargs):
186                 if not ("author" in kwargs and "name" in kwargs):
187                         abort(400)
188
189                 package = getPackageByInfo(kwargs["author"], kwargs["name"])
190
191                 del kwargs["author"]
192                 del kwargs["name"]
193
194                 return f(package=package, *args, **kwargs)
195
196         return decorated_function
197
198
199 def addNotification(target, causer, title, url, package=None):
200         try:
201                 iter(target)
202                 for x in target:
203                         addNotification(x, causer, title, url, package)
204                 return
205         except TypeError:
206                 pass
207
208         if target.rank.atLeast(UserRank.NEW_MEMBER) and target != causer:
209                 Notification.query.filter_by(user=target, causer=causer, title=title, url=url, package=package).delete()
210                 notif = Notification(target, causer, title, url, package)
211                 db.session.add(notif)
212
213
214 def addAuditLog(severity, causer, title, url, package=None, description=None):
215         entry = AuditLogEntry(causer, severity, title, url, package, description)
216         db.session.add(entry)
217
218
219 def clearNotifications(url):
220         if current_user.is_authenticated:
221                 Notification.query.filter_by(user=current_user, url=url).delete()
222                 db.session.commit()
223
224
225 YESES = ["yes", "true", "1", "on"]
226
227 def isYes(val):
228         return val and val.lower() in YESES
229
230
231 def isNo(val):
232         return val and not isYes(val)
233
234 def nonEmptyOrNone(str):
235         if str is None or str == "":
236                 return None
237
238         return str