import click fp_user_agents = '/data_store_hdd/datasets/people/ibm_dif/research/user-agents.txt' @click.command() @click.option('-i', '--input', 'opt_fp_in', required=True, help='Input CSV file') @click.option('-o', '--output', 'opt_fp_out', required=True, help='Output path') @click.option('-t', '--threads', 'opt_threads', default=8, help='Number of threads') @click.option('--slice', 'opt_slice', type=(int, int), default=(None, None), help='Slice list of files') @click.option('--agents', 'opt_fp_agents', default=fp_user_agents) @click.pass_context def cli(ctx, opt_fp_in, opt_fp_out, opt_slice, opt_threads, opt_fp_agents): """Threaded image/file downloader""" """ CSV should be formatted as |url|filepath| |---|---| |https:/site.com/photo.jpg|myfolder/myname.jpg| Saves logfile.csv output and uses for errors """ from os.path import join from functools import partial from pathlib import Path from multiprocessing.dummy import Pool as ThreadPool import urllib from random import randint import pandas as pd from tqdm import tqdm from app.utils.logger_utils import Logger log = Logger.getLogger() url_prefix = 'https://dataviz.nbcnews.com/projects/20190306-ibm-flickr-usernames/data/' with open(fp_user_agents, 'r') as fp: user_agents = fp.readlines() user_agents = [x.strip() for x in user_agents] # setup multithreading function def pool_process(item): # threaded function fp_out = item['filepath'] try: # download image opener = urllib.request.build_opener() opener.addheaders = [('User-agent', item['user_agent'])] urllib.request.install_opener(opener) urllib.request.urlretrieve(item['url'], fp_out) item['status'] = True except Exception as e: log.debug(f'Failed: user: {item["username"]}, url: {url}') if str(e) != 'HTTP Error 403: Forbidden': log.debug(f'Error: {e}') fp_error = f'{fp_out}_error.txt' with open(fp_error, 'w') as fp: fp.write('') item['status'] = False pbar.update(1) return item # setup multithreading data holders log.debug(f'loading {opt_fp_in}') df_records = pd.read_csv(opt_fp_in) if opt_slice: df_records = df_records[opt_slice[0]:opt_slice[1]] log.debug(f'loaded {len(df_records):,} csv records') log.debug('deduplicating') df_records = df_records.drop_duplicates(subset='sha256', keep="last") log.debug(f'unique records {len(df_records):,}') records = df_records.to_dict('records') log.debug(f'loaded {len(records):,} items') pool_items = [] n_skipped = 0 n_valids = 0 n_errors = 0 for x in tqdm(records): sha256 = x['sha256'] username = x['username'] fp_dst = join(opt_fp_out, f"{sha256}.json") fp_dst_is_file = Path(fp_dst).is_file() fp_dst_is_err = Path(f'{fp_dst}_error.txt').is_file() if fp_dst_is_file: n_valids += 1 elif fp_dst_is_err: n_errors += 1 if not (fp_dst_is_file or fp_dst_is_err): url = url_prefix + sha256 + '.json' user_agent = user_agents[randint(0, len(user_agents)) - 1] pool_items.append({'url':url, 'username': username, 'filepath': fp_dst, 'user_agent': user_agent}) else: n_skipped += 1 num_items = len(pool_items) log.info(f'Error files: {n_errors:,} items') log.info(f'Valid files: {n_valids:,} items') log.info(f'skipping {n_skipped:,} items') log.info(f'processing {num_items:,} items') pool_results = [] # too many records for RAM del records # run the multithreading with progress bar pbar = tqdm(total=num_items) pool_process = partial(pool_process) pool = ThreadPool(opt_threads) with tqdm(total=num_items) as pbar: pool_results = pool.map(pool_process, pool_items) pbar.close()