1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
import click
@click.command()
@click.option('-i', '--input', 'opt_fp_in', required=True,
help='Input')
@click.option('-o', '--output', 'opt_fp_out', required=True,
help='Output')
@click.option('-t', '--threads', 'opt_threads', default=8, show_default=True,
help='Number of threads')
@click.option('--wayback', 'opt_wayback', is_flag=True, default=False,
help='Check Wayback archive for URL and download cached image')
@click.option('--url', 'opt_key_url', default='url', help='Field name for URL', show_default=True)
@click.option('--filepath', 'opt_key_filepath', default='filepath', help='Field name for filepath', show_default=True)
@click.pass_context
def cli(ctx, opt_fp_in, opt_fp_out, opt_threads, opt_key_filepath, opt_key_url, opt_wayback):
"""Threaded image downloader"""
"""
CSV should be formatted as
|url|filepath|
|---|---|
|https:/site.com/photo.jpg|myfolder/myname.jpg|
Saves logfile.csv output and uses for errors
"""
from os.path import join
from functools import partial
from pathlib import Path
from multiprocessing.dummy import Pool as ThreadPool
import urllib
import pandas as pd
from tqdm import tqdm
from app.utils import file_utils
from app.utils.logger_utils import Logger
log = Logger.getLogger()
# setup multithreading function
def pool_process(item):
# threaded function
url_wayback_base = 'https://archive.org/wayback/available?url='
fp_out = item['filepath']
try:
# download image
file_utils.mkdirs(item['filepath'])
urllib.request.urlretrieve(item['url'], fp_out)
item['status'] = True
except Exception as e:
log.debug(f'Error: {e}, url: {item["url"]}')
estr = str(e)
if item['opt_wayback']:
if 'HTTP Error' in estr:
# TODO add/parse/handle request for wayback machine archive
url_wayback = url_wayback_base + item['url']
fp_error = f'{fp_out}_error.txt'
with open(fp_error, 'w') as fp:
fp.write('')
item['status'] = False
pbar.update(1)
return item
# setup multithreading data holds
log.debug(f'loading {opt_fp_in}')
records = pd.read_csv(opt_fp_in).to_dict('records')
pool_items = []
log.debug(f'Initializing multithreaded pool...')
for x in tqdm(records):
fp_dst = join(opt_fp_out, x[opt_key_filepath])
fp_dst_is_file = Path(fp_dst).is_file()
fp_dst_is_err = Path(f'{fp_dst}_error.txt').is_file()
if not fp_dst_is_file and not fp_dst_is_err:
pool_items.append({'url':x[opt_key_url], 'filepath': fp_dst, 'opt_wayback': opt_wayback})
num_items = len(pool_items)
log.info(f'Going to download {num_items:,} files')
pool_results = []
# run the multithreading with progress bar
pbar = tqdm(total=num_items)
pool_process = partial(pool_process)
pool = ThreadPool(opt_threads)
with tqdm(total=num_items) as pbar:
pool_results = pool.map(pool_process, pool_items)
pbar.close()
|