summaryrefslogtreecommitdiff
path: root/megapixels/commands/datasets/download_ibmdif.py
diff options
context:
space:
mode:
Diffstat (limited to 'megapixels/commands/datasets/download_ibmdif.py')
-rw-r--r--megapixels/commands/datasets/download_ibmdif.py121
1 files changed, 121 insertions, 0 deletions
diff --git a/megapixels/commands/datasets/download_ibmdif.py b/megapixels/commands/datasets/download_ibmdif.py
new file mode 100644
index 00000000..ed717662
--- /dev/null
+++ b/megapixels/commands/datasets/download_ibmdif.py
@@ -0,0 +1,121 @@
+import click
+
+fp_user_agents = '/data_store_hdd/datasets/people/ibm_dif/research/user-agents.txt'
+
+@click.command()
+@click.option('-i', '--input', 'opt_fp_in', required=True,
+ help='Input CSV file')
+@click.option('-o', '--output', 'opt_fp_out', required=True,
+ help='Output path')
+@click.option('-t', '--threads', 'opt_threads', default=8,
+ help='Number of threads')
+@click.option('--agents', 'opt_fp_agents', default=fp_user_agents)
+@click.pass_context
+def cli(ctx, opt_fp_in, opt_fp_out, opt_threads, opt_fp_agents):
+ """Threaded image/file downloader"""
+
+ """
+ CSV should be formatted as
+
+ |url|filepath|
+ |---|---|
+ |https:/site.com/photo.jpg|myfolder/myname.jpg|
+
+ Saves logfile.csv output and uses for errors
+ """
+
+ from os.path import join
+ from functools import partial
+ from pathlib import Path
+ from multiprocessing.dummy import Pool as ThreadPool
+ import urllib
+ from random import randint
+
+ import pandas as pd
+ from tqdm import tqdm
+ from app.utils.logger_utils import Logger
+
+ log = Logger.getLogger()
+
+ url_prefix = 'https://dataviz.nbcnews.com/projects/20190306-ibm-flickr-usernames/data/'
+
+ with open(fp_user_agents, 'r') as fp:
+ user_agents = fp.readlines()
+ user_agents = [x.strip() for x in user_agents]
+
+
+ # setup multithreading function
+ def pool_process(item):
+ # threaded function
+ fp_out = item['filepath']
+ try:
+ # download image
+ opener = urllib.request.build_opener()
+ opener.addheaders = [('User-agent', item['user_agent'])]
+ urllib.request.install_opener(opener)
+ urllib.request.urlretrieve(item['url'], fp_out)
+ item['status'] = True
+ except Exception as e:
+ if str(e) != 'HTTP Error 403: Forbidden':
+ log.debug(f'Error: {e}')
+ fp_error = f'{fp_out}_error.txt'
+ with open(fp_error, 'w') as fp:
+ fp.write('')
+ item['status'] = False
+ pbar.update(1)
+ return item
+
+ # setup multithreading data holders
+ log.debug(f'loading {opt_fp_in}')
+ df_records = pd.read_csv(opt_fp_in)
+ log.debug(f'loaded {len(df_records):,} csv records')
+ log.debug('deduplicating')
+ df_records = df_records.drop_duplicates(subset='sha256', keep="last")
+ log.debug(f'unique records {len(df_records):,}')
+ records = df_records.to_dict('records')
+ log.debug(f'loaded {len(records):,} items')
+
+ pool_items = []
+ n_skipped = 0
+ n_valids = 0
+ n_errors = 0
+
+ for x in tqdm(records):
+ sha256 = x['sha256']
+
+ fp_dst = join(opt_fp_out, f"{sha256}.json")
+ fp_dst_is_file = Path(fp_dst).is_file()
+ fp_dst_is_err = Path(f'{fp_dst}_error.txt').is_file()
+
+ if fp_dst_is_file:
+ n_valids += 1
+ elif fp_dst_is_err:
+ n_errors += 1
+
+ if not (fp_dst_is_file or fp_dst_is_err):
+ url = url_prefix + sha256 + '.json'
+ user_agent = user_agents[randint(0, len(user_agents)) - 1]
+ pool_items.append({'url':url, 'filepath': fp_dst, 'user_agent': user_agent})
+ else:
+ n_skipped += 1
+
+ num_items = len(pool_items)
+ log.info(f'Error files: {n_errors:,} items')
+ log.info(f'Valid files: {n_valids:,} items')
+ log.info(f'skipping {n_skipped:,} items')
+ log.info(f'processing {num_items:,} items')
+ pool_results = []
+
+ # too many records for RAM
+ del records
+
+ # run the multithreading with progress bar
+ pbar = tqdm(total=num_items)
+ pool_process = partial(pool_process)
+ pool = ThreadPool(opt_threads)
+ with tqdm(total=num_items) as pbar:
+ pool_results = pool.map(pool_process, pool_items)
+
+ pbar.close()
+
+