From 53f6e26015e65b8696ed1a6e5c74bdfef14b3ac2 Mon Sep 17 00:00:00 2001 From: adamhrv Date: Tue, 19 Mar 2019 12:20:38 +0100 Subject: add cmds --- megapixels/commands/datasets/decrypt_ibm.py | 100 ++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 megapixels/commands/datasets/decrypt_ibm.py (limited to 'megapixels/commands/datasets/decrypt_ibm.py') diff --git a/megapixels/commands/datasets/decrypt_ibm.py b/megapixels/commands/datasets/decrypt_ibm.py new file mode 100644 index 00000000..d25c879a --- /dev/null +++ b/megapixels/commands/datasets/decrypt_ibm.py @@ -0,0 +1,100 @@ +import click +import subprocess + +fp_in = '/data_store_hdd/datasets/people/ibm_dif/research/username_sha.csv' +fp_out = '/data_store_hdd/datasets/people/ibm_dif/research/ibm_dif_metadata.csv' +fp_dir_json = '/data_store_hdd/datasets/people/ibm_dif/research/valid_files/' + +@click.command() +@click.option('-i', '--input', 'opt_fp_in', required=True, default=fp_in, + help='Input CSV file') +@click.option('-o', '--output', 'opt_fp_out', required=True, default=fp_out, + help='Output path') +@click.option('-t', '--threads', 'opt_threads', default=8, + help='Number of threads') +@click.option('--slice', 'opt_slice', type=(int, int), default=(None, None), + help='Slice list of files') +@click.pass_context +def cli(ctx, opt_fp_in, opt_fp_out, opt_threads, opt_slice): + """Threaded Flickr metadata decryption""" + """ + CSV should be formatted as + + |sha256|username| + |---|---| + |123456789|mruser| + """ + + from os.path import join + from functools import partial + from pathlib import Path + from multiprocessing.dummy import Pool as ThreadPool + import urllib + from random import randint + import json + + import pandas as pd + from tqdm import tqdm + from app.utils.logger_utils import Logger + + log = Logger.getLogger() + + # setup multithreading function + def pool_process(item): + # threaded function + sha256 = item['sha256'] + try: + # decrypt + cmd = ['/home/adam/.nvm/versions/node/v9.9.0/bin/node', + '/data_store/datasets/people/ibm_dif/web_files/decrypt_cli', + item['username'], item['sha256']] + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + o, e = proc.communicate() + data = json.loads(o.decode()) + item['count'] = int(data['count']) + item['url'] = data['url'] + # log.info(f"{data['count']}, {data['url']}") + item['status'] = True + except Exception as e: + log.debug(f'Error: {e}') + item['status'] = False + pbar.update(1) + return item + + # setup multithreading data holders + log.debug(f'loading {opt_fp_in}') + records = pd.read_csv(opt_fp_in).to_dict('records') + if opt_slice: + records = records[opt_slice[0]:opt_slice[1]] + log.debug(f'Processing {len(records):,}') + + pool_items = [] + for record in records: + fp_json = join(fp_dir_json, f"{record['sha256']}.json") + if Path(fp_json).is_file(): + pool_items.append(record) + + + # too many records for RAM + del records + num_items = len(pool_items) + log.info(f'processing {num_items:,} items') + + # run the multithreading with progress bar + pool_results = [] + pbar = tqdm(total=num_items) + pool_process = partial(pool_process) + pool = ThreadPool(opt_threads) + + with tqdm(total=num_items) as pbar: + pool_results = pool.map(pool_process, pool_items) + + pbar.close() + + df_results = pd.DataFrame.from_dict(pool_results) + df_results = df_results[df_results.status == True] + df_results.drop(['status'], axis=1, inplace=True) + df_results.to_csv(opt_fp_out, index=False) + log.debug(f'Saved file to: {opt_fp_out}') + total = sum([int(x['count']) for x in pool_results]) + log.debug(f'Total: {total:,}') -- cgit v1.2.3-70-g09d2