summaryrefslogtreecommitdiff
path: root/megapixels/commands/datasets/download_ibmdif.py
blob: ed717662e51eb870edd8111e30a07d36c9008bb8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import click

fp_user_agents = '/data_store_hdd/datasets/people/ibm_dif/research/user-agents.txt'

@click.command()
@click.option('-i', '--input', 'opt_fp_in', required=True,
  help='Input CSV file')
@click.option('-o', '--output', 'opt_fp_out', required=True,
  help='Output path')
@click.option('-t', '--threads', 'opt_threads', default=8,
  help='Number of threads')
@click.option('--agents', 'opt_fp_agents', default=fp_user_agents)
@click.pass_context
def cli(ctx, opt_fp_in, opt_fp_out, opt_threads, opt_fp_agents):
  """Threaded image/file downloader"""
  
  """
  CSV should be formatted as 
  
  |url|filepath|
  |---|---|
  |https:/site.com/photo.jpg|myfolder/myname.jpg|

  Saves logfile.csv output and uses for errors
  """
  
  from os.path import join
  from functools import partial
  from pathlib import Path
  from multiprocessing.dummy import Pool as ThreadPool
  import urllib
  from random import randint

  import pandas as pd
  from tqdm import tqdm
  from app.utils.logger_utils import Logger

  log = Logger.getLogger()

  url_prefix = 'https://dataviz.nbcnews.com/projects/20190306-ibm-flickr-usernames/data/'

  with open(fp_user_agents, 'r') as fp:
    user_agents = fp.readlines()
  user_agents = [x.strip() for x in user_agents]
  

  # setup multithreading function
  def pool_process(item):
    # threaded function
    fp_out = item['filepath']
    try:
      # download image
      opener = urllib.request.build_opener()
      opener.addheaders = [('User-agent', item['user_agent'])]
      urllib.request.install_opener(opener)
      urllib.request.urlretrieve(item['url'], fp_out)
      item['status'] = True
    except Exception as e:
      if str(e) != 'HTTP Error 403: Forbidden':
        log.debug(f'Error: {e}')
      fp_error = f'{fp_out}_error.txt'
      with open(fp_error, 'w') as fp:
        fp.write('')
      item['status'] = False
    pbar.update(1)
    return item

  # setup multithreading data holders
  log.debug(f'loading {opt_fp_in}')
  df_records = pd.read_csv(opt_fp_in)
  log.debug(f'loaded {len(df_records):,} csv records')
  log.debug('deduplicating')
  df_records = df_records.drop_duplicates(subset='sha256', keep="last")
  log.debug(f'unique records {len(df_records):,}')
  records = df_records.to_dict('records')
  log.debug(f'loaded {len(records):,} items')

  pool_items = []
  n_skipped = 0
  n_valids = 0
  n_errors = 0

  for x in tqdm(records):
    sha256 = x['sha256']
    
    fp_dst = join(opt_fp_out, f"{sha256}.json")
    fp_dst_is_file = Path(fp_dst).is_file()
    fp_dst_is_err = Path(f'{fp_dst}_error.txt').is_file()
      
    if fp_dst_is_file:
      n_valids += 1
    elif fp_dst_is_err:
      n_errors += 1

    if not (fp_dst_is_file or fp_dst_is_err):
      url = url_prefix + sha256 + '.json'
      user_agent = user_agents[randint(0, len(user_agents)) - 1]
      pool_items.append({'url':url, 'filepath': fp_dst, 'user_agent': user_agent})
    else:
      n_skipped += 1

  num_items = len(pool_items)
  log.info(f'Error files: {n_errors:,} items')
  log.info(f'Valid files: {n_valids:,} items')
  log.info(f'skipping {n_skipped:,} items')
  log.info(f'processing {num_items:,} items')
  pool_results = []
  
  # too many records for RAM
  del records

  # run the multithreading with progress bar
  pbar = tqdm(total=num_items)
  pool_process = partial(pool_process)
  pool = ThreadPool(opt_threads) 
  with tqdm(total=num_items) as pbar:
    pool_results = pool.map(pool_process, pool_items)
  
  pbar.close()