1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
import click
from app.settings import types
from app.utils import click_utils
from app.settings import app_cfg as cfg
from app.utils.logger_utils import Logger
log = Logger.getLogger()
@click.command()
@click.option('-i', '--input', 'opt_fp_in', required=True,
help='Input directory')
@click.option('-m', '--media', 'opt_dir_media', required=True,
help='Input media directory')
@click.option('-o', '--output', 'opt_fp_out', required=True,
help='Output directory')
@click.option('--slice', 'opt_slice', type=(int, int), default=(None, None),
help='Slice list of files')
@click.option('-t', '--threads', 'opt_threads', default=4,
help='Number of threads')
@click.option('-f', '--force', 'opt_force', is_flag=True,
help='Force overwrite file')
@click.pass_context
def cli(ctx, opt_fp_in, opt_dir_media, opt_fp_out, opt_slice, opt_threads, opt_force):
"""Multithreading test"""
from glob import glob
from os.path import join
from pathlib import Path
import time
from multiprocessing.dummy import Pool as ThreadPool
import random
import pandas as pd
from tqdm import tqdm
from glob import glob
from app.utils import file_utils, im_utils
if not opt_force and Path(opt_fp_out).exists():
log.error('File exists. Use "-f / --force" to overwite')
return
df_files = pd.read_csv(opt_fp_in).set_index('index')
if opt_slice:
df_files = df_files[opt_slice[0]:opt_slice[1]]
log.info('Processing {:,} images'.format(len(df_files)))
# prepare list of images to multithread into sha256s
file_objs = []
for ds_file in df_files.itertuples():
fp_im = join(opt_dir_media, str(ds_file.subdir), f"{ds_file.fn}.{ds_file.ext}")
file_objs.append({'fp': fp_im, 'index': ds_file.Index})
# convert to thread pool
pbar = tqdm(total=len(file_objs))
def as_sha256(file_obj):
pbar.update(1)
file_obj['sha256'] = file_utils.sha256(file_obj['fp'])
return file_obj
# multithread pool
pool_file_objs = []
st = time.time()
pool = ThreadPool(opt_threads)
with tqdm(total=len(file_objs)) as pbar:
pool_file_objs = pool.map(as_sha256, file_objs)
pbar.close()
# convert data to dict
data = []
for pool_file_obj in pool_file_objs:
data.append( {
'sha256': pool_file_obj['sha256'],
'index': pool_file_obj['index']
})
# save to CSV
file_utils.mkdirs(opt_fp_out)
df = pd.DataFrame.from_dict(data)
df.to_csv(opt_fp_out, index=False)
# timing
log.info('time: {:.2f}, theads: {}'.format(time.time() - st, opt_threads))
|