summaryrefslogtreecommitdiff
path: root/megapixels/commands/datasets/download_images.py
blob: 45ca8f6e9b5b685a5a7875f9bee23a11a3765525 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import click


@click.command()
@click.option('-i', '--input', 'opt_fp_in', required=True,
  help='Input')
@click.option('-o', '--output', 'opt_fp_out', required=True,
  help='Output')
@click.option('-t', '--threads', 'opt_threads', default=8, show_default=True,
  help='Number of threads')
@click.option('--wayback', 'opt_wayback', is_flag=True, default=False,
  help='Check Wayback archive for URL and download cached image')
@click.pass_context
def cli(ctx, opt_fp_in, opt_fp_out, opt_threads, opt_wayback):
  """Threaded image downloader"""
  
  """
  CSV should be formatted as 
  
  |url|filepath|
  |---|---|
  |https:/site.com/photo.jpg|myfolder/myname.jpg|

  Saves logfile.csv output and uses for errors
  """
  
  from os.path import join
  from functools import partial
  from pathlib import Path
  from multiprocessing.dummy import Pool as ThreadPool
  import urllib

  import pandas as pd
  from tqdm import tqdm
  from app.utils import file_utils
  from app.utils.logger_utils import Logger

  log = Logger.getLogger()

  # setup multithreading function
  def pool_process(item):
    # threaded function
    url_wayback_base = 'https://archive.org/wayback/available?url='
    fp_out = item['filepath']
    try:
      # download image
      file_utils.mkdirs(item['filepath'])
      urllib.request.urlretrieve(item['url'], fp_out)
      item['status'] = True
    except Exception as e:
      log.debug(f'Error: {e}, url: {item["url"]}')
      estr = str(e)
      if item['opt_wayback']:
        if 'HTTP Error' in estr:
          # TODO add/parse/handle request for wayback machine archive
          url_wayback = url_wayback_base + item['url']
      fp_error = f'{fp_out}_error.txt'
      with open(fp_error, 'w') as fp:
        fp.write('')
      item['status'] = False
    pbar.update(1)
    return item

  # setup multithreading data holds
  log.debug(f'loading {opt_fp_in}')
  records = pd.read_csv(opt_fp_in).to_dict('records')


  pool_items = []
  log.debug(f'Initializing multithreaded pool...')
  for x in tqdm(records):
    fp_dst = join(opt_fp_out, x['filepath'])
    fp_dst_is_file = Path(fp_dst).is_file()
    fp_dst_is_err = Path(f'{fp_dst}_error.txt').is_file()
    if not fp_dst_is_file and not fp_dst_is_err:
      pool_items.append({'url':x['url'], 'filepath': fp_dst, 'opt_wayback': opt_wayback})

  num_items = len(pool_items)
  log.info(f'Going to download {num_items:,} files')
  pool_results = []

  # run the multithreading with progress bar
  pbar = tqdm(total=num_items)
  pool_process = partial(pool_process)
  pool = ThreadPool(opt_threads) 
  with tqdm(total=num_items) as pbar:
    pool_results = pool.map(pool_process, pool_items)
  
  pbar.close()