]> git.lizzy.rs Git - nhentai.git/blobdiff - nhentai/downloader.py
fix bug of proxy while downloading doujinshi
[nhentai.git] / nhentai / downloader.py
index 76dbb6296245ef54cc4433243024bf20b3936574..f0aa9fde874f9c13c13273e3a1a7e458651ce5b7 100644 (file)
-#coding: utf-8
-import os
+# coding: utf-
+
+import multiprocessing
+import signal
+
+from future.builtins import str as text
 import sys
-import socket
-import threading
-import Queue
+import os
 import requests
-from urlparse import urlparse
-from logger import logger
+import time
 
+try:
+    from urllib.parse import urlparse
+except ImportError:
+    from urlparse import urlparse
 
-# global timeout
-timeout = 30
-THREAD_TIMEOUT = 99999
-socket.setdefaulttimeout(timeout)
+from nhentai import constant
+from nhentai.logger import logger
+from nhentai.parser import request
+from nhentai.utils import Singleton
 
+requests.packages.urllib3.disable_warnings()
+semaphore = multiprocessing.Semaphore(1)
 
-class Downloader(object):
-    _instance = None
-    kill_received = False
 
-    def __new__(cls, *args, **kwargs):
-        if not cls._instance:
-            cls._instance = super(Downloader, cls).__new__(cls, *args, **kwargs)
-        return cls._instance
+class NHentaiImageNotExistException(Exception):
+    pass
 
-    def __init__(self, path='', thread=1):
-        if not isinstance(thread, (int, )) or thread < 1 or thread > 10:
-            raise ValueError('Invalid threads count')
-        self.path = str(path)
-        self.thread_count = thread
-        self.threads = []
 
-    def _download(self, url, folder='', filename=''):
-        if not os.path.exists(folder):
-            try:
-                os.mkdir(folder)
-            except os.error, e:
-                logger.error('%s error %s' % (threading.currentThread().getName(), str(e)))
-                sys.exit()
+class Downloader(Singleton):
+
+    def __init__(self, path='', size=5, timeout=30, delay=0):
+        self.size = size
+        self.path = str(path)
+        self.timeout = timeout
+        self.delay = delay
 
+    def download_(self, url, folder='', filename='', retried=0, proxy=None):
+        if self.delay:
+            time.sleep(self.delay)
+        logger.info('Starting to download {0} ...'.format(url))
         filename = filename if filename else os.path.basename(urlparse(url).path)
+        base_filename, extension = os.path.splitext(filename)
         try:
-            with open(os.path.join(folder, filename), "wb") as f:
-                response = requests.get(url, stream=True, timeout=timeout)
+            if os.path.exists(os.path.join(folder, base_filename.zfill(3) + extension)):
+                logger.warning('File: {0} exists, ignoring'.format(os.path.join(folder, base_filename.zfill(3) +
+                                                                                extension)))
+                return 1, url
+
+            response = None
+            with open(os.path.join(folder, base_filename.zfill(3) + extension), "wb") as f:
+                i = 0
+                while i < 10:
+                    try:
+                        response = request('get', url, stream=True, timeout=self.timeout, proxies=proxy)
+                        if response.status_code != 200:
+                            raise NHentaiImageNotExistException
+
+                    except NHentaiImageNotExistException as e:
+                        raise e
+
+                    except Exception as e:
+                        i += 1
+                        if not i < 10:
+                            logger.critical(str(e))
+                            return 0, None
+                        continue
+
+                    break
+
                 length = response.headers.get('content-length')
                 if length is None:
                     f.write(response.content)
                 else:
                     for chunk in response.iter_content(2048):
                         f.write(chunk)
-        except (os.error, IOError), e:
-            logger.error('%s error %s' % (threading.currentThread().getName(), str(e)))
-            sys.exit()
-        except Exception, e:
-            raise e
-        logger.info('%s %s downloaded.' % (threading.currentThread().getName(), url))
-
-    def _download_thread(self, queue, folder=''):
-        while not self.kill_received:
-            if queue.empty():
-                queue.task_done()
-                break
-            try:
-                url = queue.get(False)
-                logger.info('%s downloading: %s ...' % (threading.currentThread().getName(), url))
-                self._download(url, folder)
-            except Queue.Empty:
-                break
+
+        except (requests.HTTPError, requests.Timeout) as e:
+            if retried < 3:
+                logger.warning('Warning: {0}, retrying({1}) ...'.format(str(e), retried))
+                return 0, self.download_(url=url, folder=folder, filename=filename,
+                                         retried=retried+1, proxy=proxy)
+            else:
+                return 0, None
+
+        except NHentaiImageNotExistException as e:
+            os.remove(os.path.join(folder, base_filename.zfill(3) + extension))
+            return -1, url
+
+        except Exception as e:
+            import traceback
+            traceback.print_stack()
+            logger.critical(str(e))
+            return 0, None
+
+        except KeyboardInterrupt:
+            return -3, None
+
+        return 1, url
+
+    def _download_callback(self, result):
+        result, data = result
+        if result == 0:
+            logger.warning('fatal errors occurred, ignored')
+            # exit(1)
+        elif result == -1:
+            logger.warning('url {} return status code 404'.format(data))
+        elif result == -2:
+            logger.warning('Ctrl-C pressed, exiting sub processes ...')
+        elif result == -3:
+            # workers wont be run, just pass
+            pass
+        else:
+            logger.log(15, '{0} downloaded successfully'.format(data))
 
     def download(self, queue, folder=''):
-        if not isinstance(folder, (str, unicode)):
+        if not isinstance(folder, text):
             folder = str(folder)
 
         if self.path:
             folder = os.path.join(self.path, folder)
 
-        if os.path.exists(path=folder):
-            logger.warn('Path \'%s\' already exist' % folder)
+        if not os.path.exists(folder):
+            logger.warning('Path \'{0}\' does not exist, creating.'.format(folder))
+            try:
+                os.makedirs(folder)
+            except EnvironmentError as e:
+                logger.critical('{0}'.format(str(e)))
+
         else:
-            logger.warn('Path \'%s\' not exist' % folder)
+            logger.warning('Path \'{0}\' already exist.'.format(folder))
 
-        for i in range(self.thread_count):
-            _ = threading.Thread(target=self._download_thread, args=(queue, folder, ))
-            _.setDaemon(True)
-            self.threads.append(_)
+        queue = [(self, url, folder, constant.CONFIG['proxy']) for url in queue]
 
-        for thread in self.threads:
-            thread.start()
+        pool = multiprocessing.Pool(self.size, init_worker)
+        [pool.apply_async(download_wrapper, args=item) for item in queue]
 
-        while len(self.threads) > 0:
-            try:
-                self.threads = [t.join(THREAD_TIMEOUT) for t in self.threads if t and t.isAlive()]
-            except KeyboardInterrupt:
-                logger.warning('Ctrl-C received, sending kill signal.')
-                self.kill_received = True
+        pool.close()
+        pool.join()
+
+
+def download_wrapper(obj, url, folder='', proxy=None):
+    if sys.platform == 'darwin' or semaphore.get_value():
+        return Downloader.download_(obj, url=url, folder=folder, proxy=proxy)
+    else:
+        return -3, None
+
+
+def init_worker():
+    signal.signal(signal.SIGINT, subprocess_signal)
+
+
+def subprocess_signal(signal, frame):
+    if semaphore.acquire(timeout=1):
+        logger.warning('Ctrl-C pressed, exiting sub processes ...')
 
-        # clean threads list
-        self.threads = []
+    raise KeyboardInterrupt