+class Alert(QtWidgets.QMessageBox):
+ """
+ An alert box dialog.
+ """
+ def __init__(self, common, message, icon=QtWidgets.QMessageBox.NoIcon, buttons=QtWidgets.QMessageBox.Ok, autostart=True):
+ super(Alert, self).__init__(None)
+
+ self.setWindowTitle(_("Tor Browser Launcher"))
+ self.setWindowIcon(QtGui.QIcon(common.paths['icon_file']))
+ self.setText(message)
+ self.setIcon(icon)
+ self.setStandardButtons(buttons)
+
+ if autostart:
+ self.exec_()
+
+
+class DownloadThread(QtCore.QThread):
+ """
+ Download a file in a separate thread.
+ """
+ progress_update = QtCore.pyqtSignal(int, int)
+ download_complete = QtCore.pyqtSignal()
+ download_error = QtCore.pyqtSignal(str, str)
+
+ def __init__(self, common, url, path):
+ super(DownloadThread, self).__init__()
+ self.common = common
+ self.url = url
+ self.path = path
+
+ # Use tor socks5 proxy, if enabled
+ if self.common.settings['download_over_tor']:
+ socks5_address = 'socks5://{}'.format(self.common.settings['tor_socks_address'])
+ self.proxies = {
+ 'https': socks5_address,
+ 'http': socks5_address
+ }
+ else:
+ self.proxies = None
+
+ def run(self):
+ with open(self.path, "wb") as f:
+ try:
+ # Start the request
+ r = requests.get(self.url,
+ headers={'User-Agent': 'torbrowser-launcher'},
+ stream=True, proxies=self.proxies)
+
+ # If status code isn't 200, something went wrong
+ if r.status_code != 200:
+ # Should we use the default mirror?
+ if self.common.settings['mirror'] != self.common.default_mirror:
+ message = (_("Download Error:") +
+ " {0}\n\n" + _("You are currently using a non-default mirror") +
+ ":\n{1}\n\n" + _("Would you like to switch back to the default?")).format(
+ r.status_code, self.common.settings['mirror']
+ )
+ self.download_error.emit('error_try_default_mirror', message)
+
+ # Should we switch to English?
+ elif self.common.language != 'en-US' and not self.common.settings['force_en-US']:
+ message = (_("Download Error:") +
+ " {0}\n\n" +
+ _("Would you like to try the English version of Tor Browser instead?")).format(
+ r.status_code
+ )
+ self.download_error.emit('error_try_forcing_english', message)
+
+ else:
+ message = (_("Download Error:") + " {0}").format(r.status_code)
+ self.download_error.emit('error', message)
+
+ r.close()
+ return
+
+ # Start streaming the download
+ total_bytes = int(r.headers.get('content-length'))
+ bytes_so_far = 0
+ for data in r.iter_content(chunk_size=4096):
+ bytes_so_far += len(data)
+ f.write(data)
+ self.progress_update.emit(total_bytes, bytes_so_far)
+
+ except requests.exceptions.SSLError:
+ message = _('Invalid SSL certificate for:\n{0}\n\nYou may be under attack.').format(self.url.decode())
+ if not self.common.settings['download_over_tor']:
+ message += "\n\n" + _('Try the download again using Tor?')
+ self.download_error.emit('error_try_tor', message)
+ else:
+ self.download_error.emit('error', message)
+ return
+
+ except requests.exceptions.ConnectionError:
+ # Connection error
+ if self.common.settings['download_over_tor']:
+ message = _("Error starting download:\n\n{0}\n\nTrying to download over Tor. "
+ "Are you sure Tor is configured correctly and running?").format(self.url.decode())
+ self.download_error.emit('error', message)
+ else:
+ message = _("Error starting download:\n\n{0}\n\nAre you connected to the internet?").format(
+ self.url.decode()
+ )
+ self.download_error.emit('error', message)
+
+ return
+
+ self.download_complete.emit()
+
+
+class VerifyThread(QtCore.QThread):
+ """
+ Verify the signature in a separate thread
+ """
+ success = QtCore.pyqtSignal()
+ error = QtCore.pyqtSignal(str)
+
+ def __init__(self, common):
+ super(VerifyThread, self).__init__()
+ self.common = common
+
+ def run(self):
+ with gpg.Context() as c:
+ c.set_engine_info(gpg.constants.protocol.OpenPGP, home_dir=self.common.paths['gnupg_homedir'])
+
+ sig = gpg.Data(file=self.common.paths['sig_file'])
+ signed = gpg.Data(file=self.common.paths['tarball_file'])
+
+ try:
+ c.verify(signature=sig, signed_data=signed)
+ except gpg.errors.BadSignatures as e:
+ result = str(e).split(": ")
+ if result[1] == 'No public key':
+ self.common.refresh_keyring(result[0])
+ self.error.emit(str(e))
+ else:
+ self.success.emit()
+
+
+class ExtractThread(QtCore.QThread):
+ """
+ Extract the tarball in a separate thread
+ """
+ success = QtCore.pyqtSignal()
+ error = QtCore.pyqtSignal()
+
+ def __init__(self, common):
+ super(ExtractThread, self).__init__()
+ self.common = common
+
+ def run(self):
+ extracted = False
+ try:
+ if self.common.paths['tarball_file'][-2:] == 'xz':
+ # if tarball is .tar.xz
+ xz = lzma.LZMAFile(self.common.paths['tarball_file'])
+ tf = tarfile.open(fileobj=xz)
+ tf.extractall(self.common.paths['tbb']['dir'])
+ extracted = True
+ else:
+ # if tarball is .tar.gz
+ if tarfile.is_tarfile(self.common.paths['tarball_file']):
+ tf = tarfile.open(self.common.paths['tarball_file'])
+ tf.extractall(self.common.paths['tbb']['dir'])
+ extracted = True
+ except:
+ pass
+
+ if extracted:
+ self.success.emit()
+ else:
+ self.error.emit()