2 # Copyright (C) 2018 rubenwardy
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 from flask import Blueprint
19 bp = Blueprint("github", __name__)
21 from flask import redirect, url_for, request, flash, abort, render_template, jsonify, current_app
22 from flask_user import current_user, login_required
23 from sqlalchemy import func, or_, and_
24 from flask_github import GitHub
25 from app import github, csrf
26 from app.models import db, User, APIToken, Package, Permission
27 from app.utils import loginUser, randomString, abs_url_for
28 from app.blueprints.api.support import error, handleCreateRelease
29 import hmac, requests, json
31 from flask_wtf import FlaskForm
32 from wtforms import SelectField, SubmitField
34 @bp.route("/github/start/")
36 return github.authorize("", redirect_uri=abs_url_for("github.callback"))
38 @bp.route("/github/view/")
39 def view_permissions():
40 url = "https://github.com/settings/connections/applications/" + \
41 current_app.config["GITHUB_CLIENT_ID"]
44 @bp.route("/github/callback/")
45 @github.authorized_handler
46 def callback(oauth_token):
47 next_url = request.args.get("next")
48 if oauth_token is None:
49 flash("Authorization failed [err=gh-oauth-login-failed]", "danger")
50 return redirect(url_for("user.login"))
53 url = "https://api.github.com/user"
54 r = requests.get(url, headers={"Authorization": "token " + oauth_token})
55 username = r.json()["login"]
57 # Get user by github username
58 userByGithub = User.query.filter(func.lower(User.github_username) == func.lower(username)).first()
60 # If logged in, connect
61 if current_user and current_user.is_authenticated:
62 if userByGithub is None:
63 current_user.github_username = username
65 flash("Linked github to account", "success")
66 return redirect(url_for("homepage.home"))
68 flash("Github account is already associated with another user", "danger")
69 return redirect(url_for("homepage.home"))
71 # If not logged in, log in
73 if userByGithub is None:
74 flash("Unable to find an account for that Github user", "danger")
75 return redirect(url_for("users.claim"))
76 elif loginUser(userByGithub):
77 if not current_user.hasPassword():
78 return redirect(next_url or url_for("users.set_password", optional=True))
80 return redirect(next_url or url_for("homepage.home"))
82 flash("Authorization failed [err=gh-login-failed]", "danger")
83 return redirect(url_for("user.login"))
86 @bp.route("/github/webhook/", methods=["POST"])
92 github_url = "github.com/" + json["repository"]["full_name"]
93 package = Package.query.filter(Package.repo.ilike("%{}%".format(github_url))).first()
95 return error(400, "Could not find package, did you set the VCS repo in CDB correctly? Expected {}".format(github_url))
97 # Get all tokens for package
98 tokens_query = APIToken.query.filter(or_(APIToken.package==package,
99 and_(APIToken.package==None, APIToken.owner==package.author)))
101 possible_tokens = tokens_query.all()
108 header_signature = request.headers.get('X-Hub-Signature')
109 if header_signature is None:
110 return error(403, "Expected payload signature")
112 sha_name, signature = header_signature.split('=')
113 if sha_name != 'sha1':
114 return error(403, "Expected SHA1 payload signature")
116 for token in possible_tokens:
117 mac = hmac.new(token.access_token.encode("utf-8"), msg=request.data, digestmod='sha1')
119 if hmac.compare_digest(str(mac.hexdigest()), signature):
123 if actual_token is None:
124 return error(403, "Invalid authentication, couldn't validate API token")
126 if not package.checkPerm(actual_token.owner, Permission.APPROVE_RELEASE):
127 return error(403, "You do not have the permission to approve releases")
133 event = request.headers.get("X-GitHub-Event")
136 title = json["head_commit"]["message"].partition("\n")[0]
137 elif event == "create" and json["ref_type"] == "tag":
140 elif event == "ping":
141 return jsonify({ "success": True, "message": "Ping successful" })
143 return error(400, "Unsupported event. Only 'push', `create:tag`, and 'ping' are supported.")
149 return handleCreateRelease(actual_token, package, title, ref)
152 class SetupWebhookForm(FlaskForm):
153 event = SelectField("Event Type", choices=[('create', 'New tag or GitHub release'), ('push', 'Push')])
154 submit = SubmitField("Save")
157 @bp.route("/github/callback/webhook/")
158 @github.authorized_handler
159 def callback_webhook(oauth_token=None):
160 pid = request.args.get("pid")
164 current_user.github_access_token = oauth_token
167 return redirect(url_for("github.setup_webhook", pid=pid))
170 @bp.route("/github/webhook/new/", methods=["GET", "POST"])
173 pid = request.args.get("pid")
177 package = Package.query.get(pid)
181 if not package.checkPerm(current_user, Permission.APPROVE_RELEASE):
182 flash("Only trusted members can use webhooks", "danger")
183 return redirect(package.getDetailsURL())
185 gh_user, gh_repo = package.getGitHubFullName()
186 if gh_user is None or gh_repo is None:
187 flash("Unable to get Github full name from repo address", "danger")
188 return redirect(package.getDetailsURL())
190 if current_user.github_access_token is None:
191 return github.authorize("write:repo_hook", \
192 redirect_uri=abs_url_for("github.callback_webhook", pid=pid))
194 form = SetupWebhookForm(formdata=request.form)
195 if request.method == "POST" and form.validate():
197 token.name = "GitHub Webhook for " + package.title
198 token.owner = current_user
199 token.access_token = randomString(32)
200 token.package = package
202 event = form.event.data
203 if event != "push" and event != "create":
206 if handleMakeWebhook(gh_user, gh_repo, package, \
207 current_user.github_access_token, event, token):
208 flash("Successfully created webhook", "success")
209 return redirect(package.getDetailsURL())
211 return redirect(url_for("github.setup_webhook", pid=package.id))
213 return render_template("github/setup_webhook.html", \
214 form=form, package=package)
217 def handleMakeWebhook(gh_user, gh_repo, package, oauth, event, token):
218 url = "https://api.github.com/repos/{}/{}/hooks".format(gh_user, gh_repo)
220 "Authorization": "token " + oauth
227 "url": abs_url_for("github.webhook"),
228 "content_type": "json",
229 "secret": token.access_token
233 # First check that the webhook doesn't already exist
234 r = requests.get(url, headers=headers)
236 if r.status_code == 401 or r.status_code == 403:
237 current_user.github_access_token = None
241 if r.status_code != 200:
242 flash("Failed to create webhook, received response from Github " +
243 str(r.status_code) + ": " +
244 str(r.json().get("message")), "danger")
247 for hook in r.json():
248 if hook.get("config") and hook["config"].get("url") and \
249 hook["config"]["url"] == data["config"]["url"]:
250 flash("Failed to create webhook, as it already exists", "danger")
255 r = requests.post(url, headers=headers, data=json.dumps(data))
257 if r.status_code == 201:
258 db.session.add(token)
263 elif r.status_code == 401 or r.status_code == 403:
264 current_user.github_access_token = None
270 flash("Failed to create webhook, received response from Github " +
271 str(r.status_code) + ": " +
272 str(r.json().get("message")), "danger")