import click import subprocess fp_in = '/data_store_hdd/datasets/people/ibm_dif/research/username_sha.csv' fp_out = '/data_store_hdd/datasets/people/ibm_dif/research/ibm_dif_metadata.csv' fp_dir_json = '/data_store_hdd/datasets/people/ibm_dif/research/valid_files/' @click.command() @click.option('-i', '--input', 'opt_fp_in', required=True, default=fp_in, help='Input CSV file') @click.option('-o', '--output', 'opt_fp_out', required=True, default=fp_out, help='Output path') @click.option('-t', '--threads', 'opt_threads', default=8, help='Number of threads') @click.option('--slice', 'opt_slice', type=(int, int), default=(None, None), help='Slice list of files') @click.pass_context def cli(ctx, opt_fp_in, opt_fp_out, opt_threads, opt_slice): """Threaded Flickr metadata decryption""" """ CSV should be formatted as |sha256|username| |---|---| |123456789|mruser| """ from os.path import join from functools import partial from pathlib import Path from multiprocessing.dummy import Pool as ThreadPool import urllib from random import randint import json import pandas as pd from tqdm import tqdm from app.utils.logger_utils import Logger log = Logger.getLogger() # setup multithreading function def pool_process(item): # threaded function sha256 = item['sha256'] try: # decrypt cmd = ['/home/adam/.nvm/versions/node/v9.9.0/bin/node', '/data_store/datasets/people/ibm_dif/web_files/decrypt_cli', item['username'], item['sha256']] proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) o, e = proc.communicate() data = json.loads(o.decode()) item['count'] = int(data['count']) item['url'] = data['url'] # log.info(f"{data['count']}, {data['url']}") item['status'] = True except Exception as e: log.debug(f'Error: {e}') item['status'] = False pbar.update(1) return item # setup multithreading data holders log.debug(f'loading {opt_fp_in}') records = pd.read_csv(opt_fp_in).to_dict('records') if opt_slice: records = records[opt_slice[0]:opt_slice[1]] log.debug(f'Processing {len(records):,}') pool_items = [] for record in records: fp_json = join(fp_dir_json, f"{record['sha256']}.json") if Path(fp_json).is_file(): pool_items.append(record) # too many records for RAM del records num_items = len(pool_items) log.info(f'processing {num_items:,} items') # run the multithreading with progress bar pool_results = [] pbar = tqdm(total=num_items) pool_process = partial(pool_process) pool = ThreadPool(opt_threads) with tqdm(total=num_items) as pbar: pool_results = pool.map(pool_process, pool_items) pbar.close() df_results = pd.DataFrame.from_dict(pool_results) df_results = df_results[df_results.status == True] df_results.drop(['status'], axis=1, inplace=True) df_results.to_csv(opt_fp_out, index=False) log.debug(f'Saved file to: {opt_fp_out}') total = sum([int(x['count']) for x in pool_results]) log.debug(f'Total: {total:,}')