summaryrefslogtreecommitdiff
path: root/megapixels/commands/datasets/decrypt_ibm.py
diff options
context:
space:
mode:
authoradamhrv <adam@ahprojects.com>2019-03-19 12:20:38 +0100
committeradamhrv <adam@ahprojects.com>2019-03-19 12:20:38 +0100
commit53f6e26015e65b8696ed1a6e5c74bdfef14b3ac2 (patch)
tree8bf8b0019ff604b2165bc66e3b5deaba355b46af /megapixels/commands/datasets/decrypt_ibm.py
parent389f1f162720b577fcc652c95620eadd5e77ec43 (diff)
add cmds
Diffstat (limited to 'megapixels/commands/datasets/decrypt_ibm.py')
-rw-r--r--megapixels/commands/datasets/decrypt_ibm.py100
1 files changed, 100 insertions, 0 deletions
diff --git a/megapixels/commands/datasets/decrypt_ibm.py b/megapixels/commands/datasets/decrypt_ibm.py
new file mode 100644
index 00000000..d25c879a
--- /dev/null
+++ b/megapixels/commands/datasets/decrypt_ibm.py
@@ -0,0 +1,100 @@
+import click
+import subprocess
+
+fp_in = '/data_store_hdd/datasets/people/ibm_dif/research/username_sha.csv'
+fp_out = '/data_store_hdd/datasets/people/ibm_dif/research/ibm_dif_metadata.csv'
+fp_dir_json = '/data_store_hdd/datasets/people/ibm_dif/research/valid_files/'
+
+@click.command()
+@click.option('-i', '--input', 'opt_fp_in', required=True, default=fp_in,
+ help='Input CSV file')
+@click.option('-o', '--output', 'opt_fp_out', required=True, default=fp_out,
+ help='Output path')
+@click.option('-t', '--threads', 'opt_threads', default=8,
+ help='Number of threads')
+@click.option('--slice', 'opt_slice', type=(int, int), default=(None, None),
+ help='Slice list of files')
+@click.pass_context
+def cli(ctx, opt_fp_in, opt_fp_out, opt_threads, opt_slice):
+ """Threaded Flickr metadata decryption"""
+ """
+ CSV should be formatted as
+
+ |sha256|username|
+ |---|---|
+ |123456789|mruser|
+ """
+
+ from os.path import join
+ from functools import partial
+ from pathlib import Path
+ from multiprocessing.dummy import Pool as ThreadPool
+ import urllib
+ from random import randint
+ import json
+
+ import pandas as pd
+ from tqdm import tqdm
+ from app.utils.logger_utils import Logger
+
+ log = Logger.getLogger()
+
+ # setup multithreading function
+ def pool_process(item):
+ # threaded function
+ sha256 = item['sha256']
+ try:
+ # decrypt
+ cmd = ['/home/adam/.nvm/versions/node/v9.9.0/bin/node',
+ '/data_store/datasets/people/ibm_dif/web_files/decrypt_cli',
+ item['username'], item['sha256']]
+ proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ o, e = proc.communicate()
+ data = json.loads(o.decode())
+ item['count'] = int(data['count'])
+ item['url'] = data['url']
+ # log.info(f"{data['count']}, {data['url']}")
+ item['status'] = True
+ except Exception as e:
+ log.debug(f'Error: {e}')
+ item['status'] = False
+ pbar.update(1)
+ return item
+
+ # setup multithreading data holders
+ log.debug(f'loading {opt_fp_in}')
+ records = pd.read_csv(opt_fp_in).to_dict('records')
+ if opt_slice:
+ records = records[opt_slice[0]:opt_slice[1]]
+ log.debug(f'Processing {len(records):,}')
+
+ pool_items = []
+ for record in records:
+ fp_json = join(fp_dir_json, f"{record['sha256']}.json")
+ if Path(fp_json).is_file():
+ pool_items.append(record)
+
+
+ # too many records for RAM
+ del records
+ num_items = len(pool_items)
+ log.info(f'processing {num_items:,} items')
+
+ # run the multithreading with progress bar
+ pool_results = []
+ pbar = tqdm(total=num_items)
+ pool_process = partial(pool_process)
+ pool = ThreadPool(opt_threads)
+
+ with tqdm(total=num_items) as pbar:
+ pool_results = pool.map(pool_process, pool_items)
+
+ pbar.close()
+
+ df_results = pd.DataFrame.from_dict(pool_results)
+ df_results = df_results[df_results.status == True]
+ df_results.drop(['status'], axis=1, inplace=True)
+ df_results.to_csv(opt_fp_out, index=False)
+ log.debug(f'Saved file to: {opt_fp_out}')
+ total = sum([int(x['count']) for x in pool_results])
+ log.debug(f'Total: {total:,}')