diff options
Diffstat (limited to 'megapixels/commands/datasets/download_ibmdif.py')
| -rw-r--r-- | megapixels/commands/datasets/download_ibmdif.py | 98 |
1 files changed, 98 insertions, 0 deletions
diff --git a/megapixels/commands/datasets/download_ibmdif.py b/megapixels/commands/datasets/download_ibmdif.py new file mode 100644 index 00000000..48aca5f0 --- /dev/null +++ b/megapixels/commands/datasets/download_ibmdif.py @@ -0,0 +1,98 @@ +import click + +fp_user_agents = '/data_store_hdd/datasets/people/ibm_dif/research/user-agents.txt' + +@click.command() +@click.option('-i', '--input', 'opt_fp_in', required=True, + help='Input CSV file') +@click.option('-o', '--output', 'opt_fp_out', required=True, + help='Output path') +@click.option('-t', '--threads', 'opt_threads', default=8, + help='Number of threads') +@click.option('--agents', 'opt_fp_agents', default=fp_user_agents) +@click.pass_context +def cli(ctx, opt_fp_in, opt_fp_out, opt_threads, opt_fp_agents): + """Threaded image/file downloader""" + + """ + CSV should be formatted as + + |url|filepath| + |---|---| + |https:/site.com/photo.jpg|myfolder/myname.jpg| + + Saves logfile.csv output and uses for errors + """ + + from os.path import join + from functools import partial + from pathlib import Path + from multiprocessing.dummy import Pool as ThreadPool + import urllib + from random import randint + + import pandas as pd + from tqdm import tqdm + from app.utils.logger_utils import Logger + + log = Logger.getLogger() + + url_prefix = 'https://dataviz.nbcnews.com/projects/20190306-ibm-flickr-usernames/data/' + + with open(fp_user_agents, 'r') as fp: + user_agents = fp.readlines() + user_agents = [x.strip() for x in user_agents] + + + # setup multithreading function + def pool_process(item): + # threaded function + fp_out = item['filepath'] + try: + # download image + opener = urllib.request.build_opener() + opener.addheaders = [('User-agent', item['user_agent'])] + urllib.request.install_opener(opener) + urllib.request.urlretrieve(item['url'], fp_out) + item['status'] = True + except Exception as e: + if str(e) != 'HTTP Error 403: Forbidden': + log.debug(f'Error: {e}') + fp_error = f'{fp_out}_error.txt' + with open(fp_error, 'w') as fp: + fp.write('') + item['status'] = False + pbar.update(1) + return item + + # setup multithreading data holders + log.debug(f'loading {opt_fp_in}') + records = pd.read_csv(opt_fp_in).to_dict('records') + + pool_items = [] + for x in tqdm(records): + fp_dst = join(opt_fp_out, x['sha256'] + '.json') + fp_dst_is_file = Path(fp_dst).is_file() + fp_dst_is_err = Path(f'{fp_dst}_error.txt').is_file() + if not fp_dst_is_file and not fp_dst_is_err: + url = url_prefix + x['sha256'] + '.json' + user_agent = user_agents[randint(0, len(user_agents)) - 1] + pool_items.append({'url':url, 'filepath': fp_dst, 'user_agent': user_agent}) + + num_items = len(pool_items) + log.info(f'processing {num_items:,} items') + pool_results = [] + + # too many records for RAM + del records + + # run the multithreading with progress bar + pbar = tqdm(total=num_items) + pool_process = partial(pool_process) + pool = ThreadPool(opt_threads) + with tqdm(total=num_items) as pbar: + pool_results = pool.map(pool_process, pool_items) + + pbar.close() + + |
