summaryrefslogtreecommitdiff
path: root/megapixels/commands/datasets/decrypt_ibm.py
blob: d25c879ae9614633ac75a29e3c7849b4cfa7cd1b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import click
import subprocess

fp_in = '/data_store_hdd/datasets/people/ibm_dif/research/username_sha.csv'
fp_out = '/data_store_hdd/datasets/people/ibm_dif/research/ibm_dif_metadata.csv'
fp_dir_json = '/data_store_hdd/datasets/people/ibm_dif/research/valid_files/'

@click.command()
@click.option('-i', '--input', 'opt_fp_in', required=True, default=fp_in,
  help='Input CSV file')
@click.option('-o', '--output', 'opt_fp_out', required=True, default=fp_out,
  help='Output path')
@click.option('-t', '--threads', 'opt_threads', default=8,
  help='Number of threads')
@click.option('--slice', 'opt_slice', type=(int, int), default=(None, None),
  help='Slice list of files')
@click.pass_context
def cli(ctx, opt_fp_in, opt_fp_out, opt_threads, opt_slice):
  """Threaded Flickr metadata decryption"""
  """
  CSV should be formatted as 
  
  |sha256|username|
  |---|---|
  |123456789|mruser|
  """
  
  from os.path import join
  from functools import partial
  from pathlib import Path
  from multiprocessing.dummy import Pool as ThreadPool
  import urllib
  from random import randint
  import json

  import pandas as pd
  from tqdm import tqdm
  from app.utils.logger_utils import Logger

  log = Logger.getLogger()

  # setup multithreading function
  def pool_process(item):
    # threaded function
    sha256 = item['sha256']
    try:
      # decrypt
      cmd = ['/home/adam/.nvm/versions/node/v9.9.0/bin/node', 
        '/data_store/datasets/people/ibm_dif/web_files/decrypt_cli', 
        item['username'], item['sha256']]
      proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
      o, e = proc.communicate()
      data = json.loads(o.decode())
      item['count'] = int(data['count'])
      item['url'] = data['url']
      # log.info(f"{data['count']}, {data['url']}")
      item['status'] = True
    except Exception as e:
      log.debug(f'Error: {e}')
      item['status'] = False
    pbar.update(1)
    return item

  # setup multithreading data holders
  log.debug(f'loading {opt_fp_in}')
  records = pd.read_csv(opt_fp_in).to_dict('records')
  if opt_slice:
    records = records[opt_slice[0]:opt_slice[1]]
  log.debug(f'Processing {len(records):,}')

  pool_items = []
  for record in records:
    fp_json = join(fp_dir_json, f"{record['sha256']}.json")
    if Path(fp_json).is_file():
      pool_items.append(record)


  # too many records for RAM
  del records
  num_items = len(pool_items)
  log.info(f'processing {num_items:,} items')
  
  # run the multithreading with progress bar
  pool_results = []
  pbar = tqdm(total=num_items)
  pool_process = partial(pool_process)
  pool = ThreadPool(opt_threads) 
  
  with tqdm(total=num_items) as pbar:
    pool_results = pool.map(pool_process, pool_items)
  
  pbar.close()

  df_results = pd.DataFrame.from_dict(pool_results)
  df_results = df_results[df_results.status == True]
  df_results.drop(['status'], axis=1, inplace=True)
  df_results.to_csv(opt_fp_out, index=False)
  log.debug(f'Saved file to: {opt_fp_out}')
  total = sum([int(x['count']) for x in pool_results])
  log.debug(f'Total: {total:,}')