''' ''' import click from app.settings import types from app.utils import click_utils from app.settings import app_cfg as cfg from app.utils.logger_utils import Logger log = Logger.getLogger() # Choose part of the filepath that will be used for the person identity # eg subdirectory "lfw/media/original/batch_1/train/barack_obama/001.jpg" --> [subdir_tail] --> "barack_obama" # eg subdirectory "lfw/media/original/batch_1/train/barack_obama/001.jpg" --> [subdir_head] --> "batch_1" # eg subdirectory "lfw/media/original/batch_1/train/barack_obama/001.jpg" --> [subdir] --> "barack_obama" identity_sources = ['subdir', 'numeric'] @click.command() @click.option('-i', '--input', 'opt_fp_in', default=None, help='Override enum input filename CSV') @click.option('-o', '--output', 'opt_fp_out', default=None, help='Override enum output filename CSV') @click.option('-m', '--media', 'opt_dir_media', default=None, help='Override enum media directory') @click.option('--data_store', 'opt_data_store', type=cfg.DataStoreVar, default=click_utils.get_default(types.DataStore.HDD), show_default=True, help=click_utils.show_help(types.Dataset)) @click.option('--dataset', 'opt_dataset', type=cfg.DatasetVar, required=True, show_default=True, help=click_utils.show_help(types.Dataset)) @click.option('--slice', 'opt_slice', type=(int, int), default=(None, None), help='Slice list of files') @click.option('-t', '--threads', 'opt_threads', default=12, help='Number of threads') @click.option('-f', '--force', 'opt_force', is_flag=True, help='Force overwrite file') @click.option('--identity', 'opt_identity', type=click.Choice(identity_sources), required=True, help='Identity source key') @click.option('--recursive/--no-recursive', 'opt_recursive', is_flag=True, default=False, help='Use glob recursion (slower)') @click.option('--max-depth', 'opt_max_depth', default=None, type=int, help='Max number of images per subdirectory') @click.pass_context def cli(ctx, opt_fp_in, opt_fp_out, opt_dataset, opt_data_store, opt_dir_media, opt_slice, opt_threads, opt_identity, opt_force, opt_recursive, opt_max_depth): """Generates sha256, uuid, and identity index CSV file""" import sys, os from glob import glob from os.path import join from pathlib import Path import time from multiprocessing.dummy import Pool as ThreadPool import random import uuid from PIL import Image import cv2 as cv import pandas as pd from tqdm import tqdm from glob import glob from operator import itemgetter from app.models.data_store import DataStore from app.utils import file_utils, im_utils # set data_store data_store = DataStore(opt_data_store, opt_dataset) # get filepath out fp_out = data_store.metadata(types.Metadata.FILE_RECORD) if opt_fp_out is None else opt_fp_out # exit if exists if not opt_force and Path(fp_out).exists(): log.error(f'File {fp_out} exists. Use "-f / --force" to overwite') return # ---------------------------------------------------------------- # glob files fp_in = opt_fp_in if opt_fp_in is not None else data_store.media_images_original() log.info(f'Globbing {fp_in}') fp_ims = file_utils.glob_multi(fp_in, ['jpg', 'png'], recursive=opt_recursive) log.info('Found {:,} images'.format(len(fp_ims))) subdir_groups = {} if opt_max_depth: log.debug(f'using max depth: {opt_max_depth}') for fp_im in fp_ims: fpp_im = Path(fp_im) subdir = fp_im.split('/')[-2] if not subdir in subdir_groups.keys(): subdir_groups[subdir] = [] else: subdir_groups[subdir].append(fp_im) # for each subgroup, limit number of files fp_ims = [] for subdir_name, items in subdir_groups.items(): ims = items[0:opt_max_depth] fp_ims += ims log.debug(f'num subdirs: {len(subdir_groups.keys())}') # fail if none if not fp_ims: log.error('No images. Try with "--recursive"') return # slice to reduce if opt_slice: fp_ims = fp_ims[opt_slice[0]:opt_slice[1]] log.info('Found {:,} images'.format(len(fp_ims))) # ---------------------------------------------------------------- # multithread process into SHA256 pbar = tqdm(total=len(fp_ims)) def pool_mapper(fp_im): pbar.update(1) try: sha256 = file_utils.sha256(fp_im) im = Image.open(fp_im) im.verify() # throws error if bad file assert(im.size[0] > 60 and im.size[1] > 60) except Exception as e: log.warn(f'skipping file: {fp_im}') return None im = cv.imread(fp_im) w, h = im.shape[:2][::-1] file_size_kb = os.stat(fp_im).st_size // 1000 num_channels = im_utils.num_channels(im) return { 'width': w, 'height': h, 'sha256': sha256, 'file_size_kb': file_size_kb, 'num_channels': num_channels } # convert to thread pool pool_maps = [] # ? pool = ThreadPool(opt_threads) with tqdm(total=len(fp_ims)) as pbar: pool_maps = pool.map(pool_mapper, fp_ims) pbar.close() # ---------------------------------------------------------------- # convert data to dict data = [] indentity_count = 0 for pool_map, fp_im in zip(pool_maps, fp_ims): if pool_map is None: log.warn(f'skipping file: {fp_im}') continue # skip error files fpp_im = Path(fp_im) subdir = str(fpp_im.parent.relative_to(fp_in)) if opt_identity: subdirs = subdir.split('/') if not len(subdirs) > 0: log.error(f'Could not split subdir: "{subdir}. Try different option for "--identity"') log.error('exiting') return if opt_identity == 'subdir': identity = subdirs[-1] # use last part of subdir path elif opt_identity == 'numeric': identity = indentity_count # use incrementing number indentity_count += 1 else: identity = '' data.append({ 'subdir': subdir, 'num_channels': pool_map['num_channels'], 'fn': fpp_im.stem, 'ext': fpp_im.suffix.replace('.',''), 'sha256': pool_map['sha256'], 'uuid': uuid.uuid4(), 'identity_key': identity, 'width': pool_map['width'], 'height': pool_map['height'] }) # create dataframe df_records = pd.DataFrame.from_dict(data) df_records.index.name = 'index' # reassign 'index' as primary key column # write to CSV file_utils.mkdirs(fp_out) df_records.to_csv(fp_out) # done log.info(f'wrote {len(df_records)} rows to "{fp_out}"') # save script cmd_line = ' '.join(sys.argv) file_utils.write_text(cmd_line, '{}.sh'.format(fp_out)) ''' # create dataframe df_records = pd.DataFrame.from_dict(data) # add identity key (used for associating identity) if opt_identity: log.info(f'adding identity index using: "{opt_identity}" subdirectory') # convert dict to DataFrame # sort based on identity_key df_records = df_records.sort_values(by=['identity_key'], ascending=True) # add new column for identity df_records['identity_index'] = [-1] * len(df_records) # populate the identity_index df_records_identity_groups = df_records.groupby('identity_key') # enumerate groups to create identity indices log.info(f'updating records with identity_key. This may take a while...') st = time.time() for identity_index, df_records_identity_group_tuple in enumerate(df_records_identity_groups): identity_key, df_records_identity_group = df_records_identity_group_tuple for ds_record in df_records_identity_group.itertuples(): df_records.at[ds_record.Index, 'identity_index'] = identity_index # reset index after being sorted df_records = df_records.reset_index(drop=True) log.debug('update time: {:.2f}s'.format(time.time() - st)) else: # name everyone person 1, 2, 3... df_records = df_records.sort_values(by=['subdir'], ascending=True) pass '''