2 # Copyright (C) 2018 rubenwardy
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 from flask import Blueprint
19 bp = Blueprint("github", __name__)
21 from flask import redirect, url_for, request, flash, abort, render_template, jsonify, current_app
22 from flask_user import current_user, login_required
23 from sqlalchemy import func
24 from flask_github import GitHub
25 from app import github, csrf
26 from app.models import db, User, APIToken, Package, Permission
27 from app.utils import loginUser, randomString, abs_url_for
28 from app.blueprints.api.support import error, handleCreateRelease
29 import hmac, requests, json
31 from flask_wtf import FlaskForm
32 from wtforms import SelectField, SubmitField
34 @bp.route("/github/start/")
36 return github.authorize("", redirect_uri=abs_url_for("github.callback"))
38 @bp.route("/github/view/")
39 def view_permissions():
40 url = "https://github.com/settings/connections/applications/" + \
41 current_app.config["GITHUB_CLIENT_ID"]
44 @bp.route("/github/callback/")
45 @github.authorized_handler
46 def callback(oauth_token):
47 next_url = request.args.get("next")
48 if oauth_token is None:
49 flash("Authorization failed [err=gh-oauth-login-failed]", "danger")
50 return redirect(url_for("user.login"))
53 url = "https://api.github.com/user"
54 r = requests.get(url, headers={"Authorization": "token " + oauth_token})
55 username = r.json()["login"]
57 # Get user by github username
58 userByGithub = User.query.filter(func.lower(User.github_username) == func.lower(username)).first()
60 # If logged in, connect
61 if current_user and current_user.is_authenticated:
62 if userByGithub is None:
63 current_user.github_username = username
65 flash("Linked github to account", "success")
66 return redirect(url_for("homepage.home"))
68 flash("Github account is already associated with another user", "danger")
69 return redirect(url_for("homepage.home"))
71 # If not logged in, log in
73 if userByGithub is None:
74 flash("Unable to find an account for that Github user", "danger")
75 return redirect(url_for("users.claim"))
76 elif loginUser(userByGithub):
77 if not current_user.hasPassword():
78 return redirect(next_url or url_for("users.set_password", optional=True))
80 return redirect(next_url or url_for("homepage.home"))
82 flash("Authorization failed [err=gh-login-failed]", "danger")
83 return redirect(url_for("user.login"))
86 @bp.route("/github/webhook/", methods=["POST"])
92 github_url = "github.com/" + json["repository"]["full_name"]
93 package = Package.query.filter(Package.repo.like("%{}%".format(github_url))).first()
95 return error(400, "Unknown package")
97 # Get all tokens for package
98 possible_tokens = APIToken.query.filter_by(package=package).all()
105 header_signature = request.headers.get('X-Hub-Signature')
106 if header_signature is None:
107 return error(403, "Expected payload signature")
109 sha_name, signature = header_signature.split('=')
110 if sha_name != 'sha1':
111 return error(403, "Expected SHA1 payload signature")
113 for token in possible_tokens:
114 mac = hmac.new(token.access_token.encode("utf-8"), msg=request.data, digestmod='sha1')
116 if hmac.compare_digest(str(mac.hexdigest()), signature):
120 if actual_token is None:
121 return error(403, "Invalid authentication")
123 if not package.checkPerm(actual_token.owner, Permission.APPROVE_RELEASE):
124 return error(403, "Only trusted members can use webhooks")
130 event = request.headers.get("X-GitHub-Event")
133 title = json["head_commit"]["message"].partition("\n")[0]
134 elif event == "create" and json["ref_type"] == "tag":
137 elif event == "ping":
138 return jsonify({ "success": True, "message": "Ping successful" })
140 return error(400, "Unsupported event. Only 'push', `create:tag`, and 'ping' are supported.")
146 return handleCreateRelease(actual_token, package, title, ref)
149 class SetupWebhookForm(FlaskForm):
150 event = SelectField("Event Type", choices=[('create', 'New tag or GitHub release'), ('push', 'Push')])
151 submit = SubmitField("Save")
154 @bp.route("/github/callback/webhook/")
155 @github.authorized_handler
156 def callback_webhook(oauth_token=None):
157 pid = request.args.get("pid")
161 current_user.github_access_token = oauth_token
164 return redirect(url_for("github.setup_webhook", pid=pid))
167 @bp.route("/github/webhook/new/", methods=["GET", "POST"])
170 pid = request.args.get("pid")
174 package = Package.query.get(pid)
178 if not package.checkPerm(current_user, Permission.APPROVE_RELEASE):
179 flash("Only trusted members can use webhooks", "danger")
180 return redirect(package.getDetailsURL())
182 gh_user, gh_repo = package.getGitHubFullName()
183 if gh_user is None or gh_repo is None:
184 flash("Unable to get Github full name from repo address", "danger")
185 return redirect(package.getDetailsURL())
187 if current_user.github_access_token is None:
188 return github.authorize("write:repo_hook", \
189 redirect_uri=abs_url_for("github.callback_webhook", pid=pid))
191 form = SetupWebhookForm(formdata=request.form)
192 if request.method == "POST" and form.validate():
194 token.name = "GitHub Webhook for " + package.title
195 token.owner = current_user
196 token.access_token = randomString(32)
197 token.package = package
199 event = form.event.data
200 if event != "push" and event != "create":
203 if handleMakeWebhook(gh_user, gh_repo, package, \
204 current_user.github_access_token, event, token):
205 flash("Successfully created webhook", "success")
206 return redirect(package.getDetailsURL())
208 return redirect(url_for("github.setup_webhook", pid=package.id))
210 return render_template("github/setup_webhook.html", \
211 form=form, package=package)
214 def handleMakeWebhook(gh_user, gh_repo, package, oauth, event, token):
215 url = "https://api.github.com/repos/{}/{}/hooks".format(gh_user, gh_repo)
217 "Authorization": "token " + oauth
224 "url": abs_url_for("github.webhook"),
225 "content_type": "json",
226 "secret": token.access_token
230 # First check that the webhook doesn't already exist
231 r = requests.get(url, headers=headers)
233 if r.status_code == 401 or r.status_code == 403:
234 current_user.github_access_token = None
238 if r.status_code != 200:
239 flash("Failed to create webhook, received response from Github " +
240 str(r.status_code) + ": " +
241 str(r.json().get("message")), "danger")
244 for hook in r.json():
245 if hook.get("config") and hook["config"].get("url") and \
246 hook["config"]["url"] == data["config"]["url"]:
247 flash("Failed to create webhook, as it already exists", "danger")
252 r = requests.post(url, headers=headers, data=json.dumps(data))
254 if r.status_code == 201:
255 db.session.add(token)
260 elif r.status_code == 401 or r.status_code == 403:
261 current_user.github_access_token = None
267 flash("Failed to create webhook, received response from Github " +
268 str(r.status_code) + ": " +
269 str(r.json().get("message")), "danger")