-#coding: utf-8
-import os
+# coding: utf-
+
+import multiprocessing
+import signal
+
+from future.builtins import str as text
import sys
-import socket
-import threading
-import Queue
+import os
import requests
-from urlparse import urlparse
+import time
+
+try:
+ from urllib.parse import urlparse
+except ImportError:
+ from urlparse import urlparse
+
+from nhentai import constant
from nhentai.logger import logger
+from nhentai.parser import request
+from nhentai.utils import Singleton
+requests.packages.urllib3.disable_warnings()
+semaphore = multiprocessing.Semaphore(1)
-# global timeout
-timeout = 30
-socket.setdefaulttimeout(timeout)
+class NHentaiImageNotExistException(Exception):
+ pass
-class Downloader(object):
- _instance = None
- kill_received = False
- def __new__(cls, *args, **kwargs):
- if not cls._instance:
- cls._instance = super(Downloader, cls).__new__(cls, *args, **kwargs)
- return cls._instance
+class Downloader(Singleton):
- def __init__(self, path='', thread=1):
- if not isinstance(thread, (int, )) or thread < 1 or thread > 10:
- raise ValueError('Invalid threads count')
+ def __init__(self, path='', size=5, timeout=30, delay=0):
+ self.size = size
self.path = str(path)
- self.thread_count = thread
- self.threads = []
-
- def _download(self, url, folder='', filename=''):
- if not os.path.exists(folder):
- try:
- os.mkdir(folder)
- except os.error, e:
- logger.error('%s error %s' % (threading.currentThread().getName(), str(e)))
- sys.exit()
+ self.timeout = timeout
+ self.delay = delay
+ def download_(self, url, folder='', filename='', retried=0, proxy=None):
+ if self.delay:
+ time.sleep(self.delay)
+ logger.info('Starting to download {0} ...'.format(url))
filename = filename if filename else os.path.basename(urlparse(url).path)
+ base_filename, extension = os.path.splitext(filename)
try:
- with open(os.path.join(folder, filename), "wb") as f:
- response = requests.get(url, stream=True, timeout=timeout)
+ if os.path.exists(os.path.join(folder, base_filename.zfill(3) + extension)):
+ logger.warning('File: {0} exists, ignoring'.format(os.path.join(folder, base_filename.zfill(3) +
+ extension)))
+ return 1, url
+
+ response = None
+ with open(os.path.join(folder, base_filename.zfill(3) + extension), "wb") as f:
+ i = 0
+ while i < 10:
+ try:
+ response = request('get', url, stream=True, timeout=self.timeout, proxies=proxy)
+ if response.status_code != 200:
+ raise NHentaiImageNotExistException
+
+ except NHentaiImageNotExistException as e:
+ raise e
+
+ except Exception as e:
+ i += 1
+ if not i < 10:
+ logger.critical(str(e))
+ return 0, None
+ continue
+
+ break
+
length = response.headers.get('content-length')
if length is None:
f.write(response.content)
else:
for chunk in response.iter_content(2048):
f.write(chunk)
- except (os.error, IOError), e:
- logger.error('%s error %s' % (threading.currentThread().getName(), str(e)))
- sys.exit()
- except Exception, e:
- raise e
- logger.info('%s %s downloaded.' % (threading.currentThread().getName(), url))
-
- def _download_thread(self, queue, folder=''):
- while not self.kill_received:
- if queue.empty():
- queue.task_done()
- break
- try:
- url = queue.get(False)
- logger.info('%s downloading: %s ...' % (threading.currentThread().getName(), url))
- self._download(url, folder)
- except Queue.Empty:
- break
+
+ except (requests.HTTPError, requests.Timeout) as e:
+ if retried < 3:
+ logger.warning('Warning: {0}, retrying({1}) ...'.format(str(e), retried))
+ return 0, self.download_(url=url, folder=folder, filename=filename,
+ retried=retried+1, proxy=proxy)
+ else:
+ return 0, None
+
+ except NHentaiImageNotExistException as e:
+ os.remove(os.path.join(folder, base_filename.zfill(3) + extension))
+ return -1, url
+
+ except Exception as e:
+ import traceback
+ traceback.print_stack()
+ logger.critical(str(e))
+ return 0, None
+
+ except KeyboardInterrupt:
+ return -3, None
+
+ return 1, url
+
+ def _download_callback(self, result):
+ result, data = result
+ if result == 0:
+ logger.warning('fatal errors occurred, ignored')
+ # exit(1)
+ elif result == -1:
+ logger.warning('url {} return status code 404'.format(data))
+ elif result == -2:
+ logger.warning('Ctrl-C pressed, exiting sub processes ...')
+ elif result == -3:
+ # workers wont be run, just pass
+ pass
+ else:
+ logger.log(15, '{0} downloaded successfully'.format(data))
def download(self, queue, folder=''):
- if not isinstance(folder, (str, unicode)):
+ if not isinstance(folder, text):
folder = str(folder)
if self.path:
folder = os.path.join(self.path, folder)
- if os.path.exists(path=folder):
- logger.warn('Path \'%s\' already exist' % folder)
+ if not os.path.exists(folder):
+ logger.warning('Path \'{0}\' does not exist, creating.'.format(folder))
+ try:
+ os.makedirs(folder)
+ except EnvironmentError as e:
+ logger.critical('{0}'.format(str(e)))
+
else:
- logger.warn('Path \'%s\' not exist' % folder)
+ logger.warning('Path \'{0}\' already exist.'.format(folder))
- for i in range(self.thread_count):
- _ = threading.Thread(target=self._download_thread, args=(queue, folder, ))
- _.setDaemon(True)
- self.threads.append(_)
+ queue = [(self, url, folder, constant.CONFIG['proxy']) for url in queue]
- for thread in self.threads:
- thread.start()
+ pool = multiprocessing.Pool(self.size, init_worker)
+ [pool.apply_async(download_wrapper, args=item) for item in queue]
- while len(self.threads) > 0:
- try:
- self.threads = [t.join(1) for t in self.threads if t is not None and t.isAlive()]
- except KeyboardInterrupt:
- logger.warning('Ctrl-C received, sending kill signal.')
- self.kill_received = True
+ pool.close()
+ pool.join()
+
+
+def download_wrapper(obj, url, folder='', proxy=None):
+ if sys.platform == 'darwin' or semaphore.get_value():
+ return Downloader.download_(obj, url=url, folder=folder, proxy=proxy)
+ else:
+ return -3, None
+
+
+def init_worker():
+ signal.signal(signal.SIGINT, subprocess_signal)
+
+
+def subprocess_signal(signal, frame):
+ if semaphore.acquire(timeout=1):
+ logger.warning('Ctrl-C pressed, exiting sub processes ...')
- # clean threads list
- self.threads = []
+ raise KeyboardInterrupt