1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
import click
@click.command()
@click.option('-i', '--input', 'opt_fp_in', required=True,
help='Input')
@click.option('-o', '--output', 'opt_fp_out', required=True,
help='Output')
@click.option('-t', '--threads', 'opt_threads', default=8,
help='Number of threads')
@click.pass_context
def cli(ctx, opt_fp_in, opt_fp_out, opt_threads):
"""Threaded image downloader"""
"""
CSV should be formatted as
|url|filepath|
|---|---|
|https:/site.com/photo.jpg|myfolder/myname.jpg|
Saves logfile.csv output and uses for errors
"""
from os.path import join
from functools import partial
from pathlib import Path
from multiprocessing.dummy import Pool as ThreadPool
import urllib
import pandas as pd
from tqdm import tqdm
from app.utils import file_utils
from app.utils.logger_utils import Logger
log = Logger.getLogger()
# setup multithreading function
def pool_process(item):
# threaded function
fp_out = item['filepath']
try:
# download image
file_utils.mkdirs(item['filepath'])
urllib.request.urlretrieve(item['url'], fp_out)
item['status'] = True
except Exception as e:
log.debug(f'Error: {e}')
fp_error = f'{fp_out}_error.txt'
with open(fp_error, 'w') as fp:
fp.write('')
item['status'] = False
pbar.update(1)
return item
# setup multithreading data holds
log.debug(f'loading {opt_fp_in}')
records = pd.read_csv(opt_fp_in).to_dict('records')
pool_items = []
for x in tqdm(records):
fp_dst = join(opt_fp_out, x['filepath'])
fp_dst_is_file = Path(fp_dst).is_file()
fp_dst_is_err = Path(f'{fp_dst}_error.txt').is_file()
if not fp_dst_is_file and not fp_dst_is_err:
pool_items.append({'url':x['url'], 'filepath': fp_dst})
num_items = len(pool_items)
log.info(f'processing {num_items:,} items')
pool_results = []
# run the multithreading with progress bar
pbar = tqdm(total=num_items)
pool_process = partial(pool_process)
pool = ThreadPool(opt_threads)
with tqdm(total=num_items) as pbar:
pool_results = pool.map(pool_process, pool_items)
pbar.close()
|