1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
import click
fp_user_agents = '/data_store_hdd/datasets/people/ibm_dif/research/user-agents.txt'
@click.command()
@click.option('-i', '--input', 'opt_fp_in', required=True,
help='Input CSV file')
@click.option('-o', '--output', 'opt_fp_out', required=True,
help='Output path')
@click.option('-t', '--threads', 'opt_threads', default=8,
help='Number of threads')
@click.option('--agents', 'opt_fp_agents', default=fp_user_agents)
@click.pass_context
def cli(ctx, opt_fp_in, opt_fp_out, opt_threads, opt_fp_agents):
"""Threaded image/file downloader"""
"""
CSV should be formatted as
|url|filepath|
|---|---|
|https:/site.com/photo.jpg|myfolder/myname.jpg|
Saves logfile.csv output and uses for errors
"""
from os.path import join
from functools import partial
from pathlib import Path
from multiprocessing.dummy import Pool as ThreadPool
import urllib
from random import randint
import pandas as pd
from tqdm import tqdm
from app.utils.logger_utils import Logger
log = Logger.getLogger()
url_prefix = 'https://dataviz.nbcnews.com/projects/20190306-ibm-flickr-usernames/data/'
with open(fp_user_agents, 'r') as fp:
user_agents = fp.readlines()
user_agents = [x.strip() for x in user_agents]
# setup multithreading function
def pool_process(item):
# threaded function
fp_out = item['filepath']
try:
# download image
opener = urllib.request.build_opener()
opener.addheaders = [('User-agent', item['user_agent'])]
urllib.request.install_opener(opener)
urllib.request.urlretrieve(item['url'], fp_out)
item['status'] = True
except Exception as e:
if str(e) != 'HTTP Error 403: Forbidden':
log.debug(f'Error: {e}')
fp_error = f'{fp_out}_error.txt'
with open(fp_error, 'w') as fp:
fp.write('')
item['status'] = False
pbar.update(1)
return item
# setup multithreading data holders
log.debug(f'loading {opt_fp_in}')
df_records = pd.read_csv(opt_fp_in)
log.debug(f'loaded {len(df_records):,} csv records')
records = df_records.to_dict('records')
log.debug(f'loaded {len(records):,} items')
pool_items = []
n_skipped = 0
n_valids = 0
n_errors = 0
sha256_list = []
for x in tqdm(records):
sha256 = x['sha256']
if sha256 in sha256_list:
continue;
else:
sha256_list.append(sha256)
fp_dst = join(opt_fp_out, f"{sha256}.json")
fp_dst_is_file = Path(fp_dst).is_file()
fp_dst_is_err = Path(f'{fp_dst}_error.txt').is_file()
if fp_dst_is_file:
n_valids += 1
elif fp_dst_is_err:
n_errors += 1
if not (fp_dst_is_file or fp_dst_is_err):
url = url_prefix + sha256 + '.json'
user_agent = user_agents[randint(0, len(user_agents)) - 1]
pool_items.append({'url':url, 'filepath': fp_dst, 'user_agent': user_agent})
else:
n_skipped += 1
num_items = len(pool_items)
log.info(f'Error files: {n_errors:,} items')
log.info(f'Valid files: {n_valids:,} items')
log.info(f'skipping {n_skipped:,} items')
log.info(f'Unique sha256s {len(sha256_list):,} items')
log.info(f'processing {num_items:,} items')
pool_results = []
# too many records for RAM
del records
# run the multithreading with progress bar
pbar = tqdm(total=num_items)
pool_process = partial(pool_process)
pool = ThreadPool(opt_threads)
with tqdm(total=num_items) as pbar:
pool_results = pool.map(pool_process, pool_items)
pbar.close()
|