import click @click.command() @click.option('-i', '--input', 'opt_fp_in', required=True, help='Input') @click.option('-o', '--output', 'opt_fp_out', required=True, help='Output') @click.option('-t', '--threads', 'opt_threads', default=8, show_default=True, help='Number of threads') @click.option('--wayback', 'opt_wayback', is_flag=True, default=False, help='Check Wayback archive for URL and download cached image') @click.option('--url', 'opt_key_url', default='url', help='Field name for URL', show_default=True) @click.option('--filepath', 'opt_key_filepath', default='filepath', help='Field name for filepath', show_default=True) @click.pass_context def cli(ctx, opt_fp_in, opt_fp_out, opt_threads, opt_key_filepath, opt_key_url, opt_wayback): """Threaded image downloader""" """ CSV should be formatted as |url|filepath| |---|---| |https:/site.com/photo.jpg|myfolder/myname.jpg| Saves logfile.csv output and uses for errors """ from os.path import join from functools import partial from pathlib import Path from multiprocessing.dummy import Pool as ThreadPool import urllib import pandas as pd from tqdm import tqdm from app.utils import file_utils from app.utils.logger_utils import Logger log = Logger.getLogger() # setup multithreading function def pool_process(item): # threaded function url_wayback_base = 'https://archive.org/wayback/available?url=' fp_out = item['filepath'] try: # download image file_utils.mkdirs(item['filepath']) urllib.request.urlretrieve(item['url'], fp_out) item['status'] = True except Exception as e: log.debug(f'Error: {e}, url: {item["url"]}') estr = str(e) if item['opt_wayback']: if 'HTTP Error' in estr: # TODO add/parse/handle request for wayback machine archive url_wayback = url_wayback_base + item['url'] fp_error = f'{fp_out}_error.txt' with open(fp_error, 'w') as fp: fp.write('') item['status'] = False pbar.update(1) return item # setup multithreading data holds log.debug(f'loading {opt_fp_in}') records = pd.read_csv(opt_fp_in).to_dict('records') pool_items = [] log.debug(f'Initializing multithreaded pool...') for x in tqdm(records): fp_dst = join(opt_fp_out, x[opt_key_filepath]) fp_dst_is_file = Path(fp_dst).is_file() fp_dst_is_err = Path(f'{fp_dst}_error.txt').is_file() if not fp_dst_is_file and not fp_dst_is_err: pool_items.append({'url':x[opt_key_url], 'filepath': fp_dst, 'opt_wayback': opt_wayback}) num_items = len(pool_items) log.info(f'Going to download {num_items:,} files') pool_results = [] # run the multithreading with progress bar pbar = tqdm(total=num_items) pool_process = partial(pool_process) pool = ThreadPool(opt_threads) with tqdm(total=num_items) as pbar: pool_results = pool.map(pool_process, pool_items) pbar.close()