]> git.lizzy.rs Git - cheatdb.git/blob - app/blueprints/github/__init__.py
Fix crash on existing GitHub App Integration
[cheatdb.git] / app / blueprints / github / __init__.py
1 # Content DB
2 # Copyright (C) 2018  rubenwardy
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 from flask import Blueprint
18
19 bp = Blueprint("github", __name__)
20
21 from flask import redirect, url_for, request, flash, abort, render_template, jsonify
22 from flask_user import current_user, login_required
23 from sqlalchemy import func
24 from flask_github import GitHub
25 from app import github, csrf
26 from app.models import db, User, APIToken, Package, Permission
27 from app.utils import loginUser, randomString, abs_url_for
28 from app.blueprints.api.support import error, handleCreateRelease
29 import hmac, requests, json
30
31 from flask_wtf import FlaskForm
32 from wtforms import SelectField, SubmitField
33
34 @bp.route("/github/start/")
35 def start():
36         return github.authorize("", redirect_uri=url_for("github.callback"))
37
38 @bp.route("/github/callback/")
39 @github.authorized_handler
40 def callback(oauth_token):
41         next_url = request.args.get("next")
42         if oauth_token is None:
43                 flash("Authorization failed [err=gh-oauth-login-failed]", "danger")
44                 return redirect(url_for("user.login"))
45
46         # Get Github username
47         url = "https://api.github.com/user"
48         r = requests.get(url, headers={"Authorization": "token " + oauth_token})
49         username = r.json()["login"]
50
51         # Get user by github username
52         userByGithub = User.query.filter(func.lower(User.github_username) == func.lower(username)).first()
53
54         # If logged in, connect
55         if current_user and current_user.is_authenticated:
56                 if userByGithub is None:
57                         current_user.github_username = username
58                         db.session.commit()
59                         flash("Linked github to account", "success")
60                         return redirect(url_for("homepage.home"))
61                 else:
62                         flash("Github account is already associated with another user", "danger")
63                         return redirect(url_for("homepage.home"))
64
65         # If not logged in, log in
66         else:
67                 if userByGithub is None:
68                         flash("Unable to find an account for that Github user", "danger")
69                         return redirect(url_for("users.claim"))
70                 elif loginUser(userByGithub):
71                         if not current_user.hasPassword():
72                                 return redirect(next_url or url_for("users.set_password", optional=True))
73                         else:
74                                 return redirect(next_url or url_for("homepage.home"))
75                 else:
76                         flash("Authorization failed [err=gh-login-failed]", "danger")
77                         return redirect(url_for("user.login"))
78
79
80 @bp.route("/github/webhook/", methods=["POST"])
81 @csrf.exempt
82 def webhook():
83         json = request.json
84
85         # Get package
86         github_url = "github.com/" + json["repository"]["full_name"]
87         package = Package.query.filter(Package.repo.like("%{}%".format(github_url))).first()
88         if package is None:
89                 return error(400, "Unknown package")
90
91         # Get all tokens for package
92         possible_tokens = APIToken.query.filter_by(package=package).all()
93         actual_token = None
94
95         #
96         # Check signature
97         #
98
99         header_signature = request.headers.get('X-Hub-Signature')
100         if header_signature is None:
101                 return error(403, "Expected payload signature")
102
103         sha_name, signature = header_signature.split('=')
104         if sha_name != 'sha1':
105                 return error(403, "Expected SHA1 payload signature")
106
107         for token in possible_tokens:
108                 mac = hmac.new(token.access_token.encode("utf-8"), msg=request.data, digestmod='sha1')
109
110                 if hmac.compare_digest(str(mac.hexdigest()), signature):
111                         actual_token = token
112                         break
113
114         if actual_token is None:
115                 return error(403, "Invalid authentication")
116
117         if not package.checkPerm(actual_token.owner, Permission.APPROVE_RELEASE):
118                 return error(403, "Only trusted members can use webhooks")
119
120         #
121         # Check event
122         #
123
124         event = request.headers.get("X-GitHub-Event")
125         if event == "push":
126                 ref = json["after"]
127                 title = json["head_commit"]["message"].partition("\n")[0]
128         elif event == "create" and json["ref_type"] == "tag":
129                 ref = json["ref"]
130                 title = ref
131         elif event == "ping":
132                 return jsonify({ "success": True, "message": "Ping successful" })
133         else:
134                 return error(400, "Unsupported event. Only 'push', `create:tag`, and 'ping' are supported.")
135
136         #
137         # Perform release
138         #
139
140         return handleCreateRelease(actual_token, package, title, ref)
141
142
143 class SetupWebhookForm(FlaskForm):
144         event   = SelectField("Event Type", choices=[('create', 'New tag'), ('push', 'Push')])
145         submit  = SubmitField("Save")
146
147
148 @bp.route("/github/callback/webhook/")
149 @github.authorized_handler
150 def callback_webhook(oauth_token=None):
151         pid = request.args.get("pid")
152         if pid is None:
153                 abort(404)
154
155         current_user.github_access_token = oauth_token
156         db.session.commit()
157
158         return redirect(url_for("github.setup_webhook", pid=pid))
159
160
161 @bp.route("/github/webhook/new/", methods=["GET", "POST"])
162 @login_required
163 def setup_webhook():
164         pid = request.args.get("pid")
165         if pid is None:
166                 abort(404)
167
168         package = Package.query.get(pid)
169         if package is None:
170                 abort(404)
171
172         if not package.checkPerm(current_user, Permission.APPROVE_RELEASE):
173                 flash("Only trusted members can use webhooks", "danger")
174                 return redirect(package.getDetailsURL())
175
176         gh_user, gh_repo = package.getGitHubFullName()
177         if gh_user is None or gh_repo is None:
178                 flash("Unable to get Github full name from repo address", "danger")
179                 return redirect(package.getDetailsURL())
180
181         if current_user.github_access_token is None:
182                 return github.authorize("write:repo_hook", \
183                         redirect_uri=abs_url_for("github.callback_webhook", pid=pid))
184
185         form = SetupWebhookForm(formdata=request.form)
186         if request.method == "POST" and form.validate():
187                 token = APIToken()
188                 token.name = "Github Webhook for " + package.title
189                 token.owner = current_user
190                 token.access_token = randomString(32)
191                 token.package = package
192
193                 event = form.event.data
194                 if event != "push" and event != "create":
195                         abort(500)
196
197                 if handleMakeWebhook(gh_user, gh_repo, package, \
198                                 current_user.github_access_token, event, token):
199                         return redirect(package.getDetailsURL())
200                 else:
201                         return redirect(url_for("github.setup_webhook", pid=package.id))
202
203         return render_template("github/setup_webhook.html", \
204                 form=form, package=package)
205
206
207 def handleMakeWebhook(gh_user, gh_repo, package, oauth, event, token):
208         url = "https://api.github.com/repos/{}/{}/hooks".format(gh_user, gh_repo)
209         headers = {
210                 "Authorization": "token " + oauth
211         }
212         data = {
213                 "name": "web",
214                 "active": True,
215                 "events": [event],
216                 "config": {
217                         "url": abs_url_for("github.webhook"),
218                         "content_type": "json",
219                         "secret": token.access_token
220                 },
221         }
222
223         # First check that the webhook doesn't already exist
224         r = requests.get(url, headers=headers)
225
226         if r.status_code == 401 or r.status_code == 403:
227                 current_user.github_access_token = None
228                 db.session.commit()
229                 return False
230
231         if r.status_code != 200:
232                 flash("Failed to create webhook, received response from Github " +
233                         str(r.status_code) + ": " +
234                         str(r.json().get("message")), "danger")
235                 return False
236
237         for hook in r.json():
238                 if hook.get("config") and hook["config"].get("url") and \
239                                 hook["config"]["url"] == data["config"]["url"]:
240                         flash("Failed to create webhook, as it already exists", "danger")
241                         return False
242
243
244         # Create it
245         r = requests.post(url, headers=headers, data=json.dumps(data))
246
247         if r.status_code == 201:
248                 db.session.add(token)
249                 db.session.commit()
250
251                 return True
252
253         elif r.status_code == 401 or r.status_code == 403:
254                 current_user.github_access_token = None
255                 db.session.commit()
256
257                 return False
258
259         else:
260                 flash("Failed to create webhook, received response from Github " +
261                         str(r.status_code) + ": " +
262                         str(r.json().get("message")), "danger")
263                 return False