1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
import click
import subprocess
fp_in = '/data_store_hdd/datasets/people/ibm_dif/research/username_sha.csv'
fp_out = '/data_store_hdd/datasets/people/ibm_dif/research/ibm_dif_metadata.csv'
fp_dir_json = '/data_store_hdd/datasets/people/ibm_dif/research/valid_files/'
@click.command()
@click.option('-i', '--input', 'opt_fp_in', required=True, default=fp_in,
help='Input CSV file')
@click.option('-o', '--output', 'opt_fp_out', required=True, default=fp_out,
help='Output path')
@click.option('-t', '--threads', 'opt_threads', default=8,
help='Number of threads')
@click.option('--slice', 'opt_slice', type=(int, int), default=(None, None),
help='Slice list of files')
@click.pass_context
def cli(ctx, opt_fp_in, opt_fp_out, opt_threads, opt_slice):
"""Threaded Flickr metadata decryption"""
"""
CSV should be formatted as
|sha256|username|
|---|---|
|123456789|mruser|
"""
from os.path import join
from functools import partial
from pathlib import Path
from multiprocessing.dummy import Pool as ThreadPool
import urllib
from random import randint
import json
import pandas as pd
from tqdm import tqdm
from app.utils.logger_utils import Logger
log = Logger.getLogger()
# setup multithreading function
def pool_process(item):
# threaded function
sha256 = item['sha256']
try:
# decrypt
cmd = ['/home/adam/.nvm/versions/node/v9.9.0/bin/node',
'/data_store/datasets/people/ibm_dif/web_files/decrypt_cli',
item['username'], item['sha256']]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
o, e = proc.communicate()
data = json.loads(o.decode())
item['count'] = int(data['count'])
item['url'] = data['url']
# log.info(f"{data['count']}, {data['url']}")
item['status'] = True
except Exception as e:
log.debug(f'Error: {e}')
item['status'] = False
pbar.update(1)
return item
# setup multithreading data holders
log.debug(f'loading {opt_fp_in}')
records = pd.read_csv(opt_fp_in).to_dict('records')
if opt_slice:
records = records[opt_slice[0]:opt_slice[1]]
log.debug(f'Processing {len(records):,}')
pool_items = []
for record in records:
fp_json = join(fp_dir_json, f"{record['sha256']}.json")
if Path(fp_json).is_file():
pool_items.append(record)
# too many records for RAM
del records
num_items = len(pool_items)
log.info(f'processing {num_items:,} items')
# run the multithreading with progress bar
pool_results = []
pbar = tqdm(total=num_items)
pool_process = partial(pool_process)
pool = ThreadPool(opt_threads)
with tqdm(total=num_items) as pbar:
pool_results = pool.map(pool_process, pool_items)
pbar.close()
df_results = pd.DataFrame.from_dict(pool_results)
df_results = df_results[df_results.status == True]
df_results.drop(['status'], axis=1, inplace=True)
df_results.to_csv(opt_fp_out, index=False)
log.debug(f'Saved file to: {opt_fp_out}')
total = sum([int(x['count']) for x in pool_results])
log.debug(f'Total: {total:,}')
|