from app.settings import app_cfg import os from os.path import join os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' import time import numpy as np import random from subprocess import call import cv2 as cv from PIL import Image from glob import glob import tensorflow as tf import tensorflow_hub as hub import shutil import h5py tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) from app.search.json import save_params_latent, save_params_dense from app.search.image import image_to_uint8, imconvert_uint8, imconvert_float32, \ imread, imwrite, imgrid, resize_and_crop_image from app.search.vector import truncated_z_sample, truncated_z_single, \ create_labels, create_labels_uniform from app.search.video import export_video from app.search.params import timestamp feature_layer_names = { '1a': "InceptionV3/Conv2d_1a_3x3", '2a': "InceptionV3/Conv2d_2a_3x3", '2b': "InceptionV3/Conv2d_2b_3x3", '3a': "InceptionV3/Conv2d_3a_3x3", '3b': "InceptionV3/Conv2d_3b_3x3", '4a': "InceptionV3/Conv2d_4a_3x3", '5b': "InceptionV3/Mixed_5b", '5c': "InceptionV3/Mixed_5c", '5d': "InceptionV3/Mixed_5d", '6a': "InceptionV3/Mixed_6a", '6b': "InceptionV3/Mixed_6b", '6c': "InceptionV3/Mixed_6c", '6d': "InceptionV3/Mixed_6d", '6e': "InceptionV3/Mixed_6e", '7a': "InceptionV3/Mixed_7a", '7b': "InceptionV3/Mixed_7b", '7c': "InceptionV3/Mixed_7c", } def find_nearest_vector_for_images(paths, opt_dims, opt_steps, opt_video, opt_tag, opt_limit=-1, opt_stochastic_clipping=True, opt_label_clipping=True, opt_use_feature_detector=False, opt_feature_layers=[1,2,4,7], opt_snapshot_interval=20, opt_clip_interval=500, opt_folder_id=59): tf.reset_default_graph() sess = tf.compat.v1.Session() print("Initializing generator...") generator = hub.Module('https://tfhub.dev/deepmind/biggan-512/2') fp_inverses = os.path.join(app_cfg.DIR_INVERSES, opt_tag) os.makedirs(fp_inverses, exist_ok=True) # save_params_latent(fp_inverses, opt_tag) save_params_dense(fp_inverses, opt_tag, folder_id=opt_folder_id) out_file = h5py.File(join(fp_inverses, 'dataset.latent.hdf5'), 'w') out_images = out_file.create_dataset('xtrain', (len(paths), 3, 512, 512,), dtype='float32') out_labels = out_file.create_dataset('ytrain', (len(paths), 1000,), dtype='float32') out_latent = out_file.create_dataset('latent', (len(paths), 128,), dtype='float32') out_fns = out_file.create_dataset('fn', (len(paths),), dtype=h5py.string_dtype()) for index, path in enumerate(paths): if index == opt_limit: break out_fns[index] = os.path.basename(path) fp_frames = find_nearest_vector(sess, generator, path, opt_dims, out_images, out_labels, out_latent, opt_steps, index, opt_tag, opt_stochastic_clipping, opt_label_clipping, opt_use_feature_detector, opt_feature_layers, opt_snapshot_interval, opt_clip_interval, opt_video) if opt_video: export_video(fp_frames) sess.close() def find_nearest_vector(sess, generator, opt_fp_in, opt_dims, out_images, out_labels, out_latent, opt_steps, index, opt_tag, opt_stochastic_clipping, opt_label_clipping, opt_use_feature_detector, opt_feature_layers, opt_snapshot_interval, opt_clip_interval, opt_video): """ Find the closest latent and class vectors for an image. Store the class vector in an HDF5. """ batch_size = 1 truncation = 1.0 z_dim = 128 vocab_size = 1000 img_size = 512 num_channels = 3 z_initial = truncated_z_sample(batch_size, z_dim, truncation/2) y_initial = create_labels_uniform(batch_size, vocab_size) input_z = tf.compat.v1.Variable(z_initial, dtype=np.float32, constraint=lambda t: tf.clip_by_value(t, -2.5, 2.5)) input_y = tf.compat.v1.Variable(y_initial, dtype=np.float32, constraint=lambda t: tf.clip_by_value(t, 0, 1.5)) input_trunc = tf.compat.v1.constant(1.0) output = generator({ 'z': input_z, 'y': input_y, 'truncation': input_trunc, }) target = tf.compat.v1.placeholder(tf.float32, shape=(batch_size, img_size, img_size, num_channels)) ## clip the Z encoding opt_clip = 1.5 clipped_latent = tf.where(tf.abs(input_z) >= opt_clip, tf.random.uniform([batch_size, z_dim], minval=-opt_clip, maxval=opt_clip), input_z) clipped_alpha = tf.compat.v1.placeholder(dtype=np.float32, shape=()) clip_latent = tf.assign(input_z, clipped_latent * (1 - clipped_alpha) + input_z * clipped_alpha) ## normalize the Y encoding # normalized_labels = tf.nn.l2_normalize(input_y) # tf.reduce_mean(tf.abs(encoding - gen_encoding)) normalized_labels = input_y / tf.math.maximum(1.0, tf.reduce_max(input_y)) normalized_alpha = tf.compat.v1.placeholder(dtype=np.float32, shape=()) clip_labels = tf.assign(input_y, normalized_labels * (1 - normalized_alpha) + input_y * normalized_alpha) ## if computing Feature loss, use these encoders if opt_use_feature_detector: print("Initializing feature detector...") pix_square_diff = tf.square((target - output) / 2.0) mse_loss = tf.reduce_mean(pix_square_diff) feature_extractor = hub.Module("https://tfhub.dev/google/imagenet/inception_v3/feature_vector/1") # Convert images from range [-1, 1] channels_first to [0, 1] channels_last. # gen_img_1 = tf.transpose(output / 2.0 + 0.5, [0, 2, 3, 1]) # target_img_1 = tf.transpose(target / 2.0 + 0.5, [0, 2, 3, 1]) gen_img_1 = output / 2.0 + 0.5 target_img_1 = target / 2.0 + 0.5 # Convert images to appropriate size for feature extraction. height, width = hub.get_expected_image_size(feature_extractor) gen_img_1 = tf.image.resize_images(gen_img_1, [height, width]) target_img_1 = tf.image.resize_images(target_img_1, [height, width]) gen_feat_ex = feature_extractor(dict(images=gen_img_1), as_dict=True, signature='image_feature_vector') target_feat_ex = feature_extractor(dict(images=target_img_1), as_dict=True, signature='image_feature_vector') feat_loss = tf.constant(0.0) for layer in opt_feature_layers: layer_name = feature_layer_names[layer] gen_feat = gen_feat_ex[layer_name] target_feat = target_feat_ex[layer_name] feat_square_diff = tf.reshape(tf.square(gen_feat - target_feat), [batch_size, -1]) feat_loss += tf.reduce_mean(feat_square_diff) / len(opt_feature_layers) loss = 1.0 * mse_loss + 1.0 * feat_loss z_lr = 0.001 y_lr = 0.001 optimizer_z = tf.train.AdamOptimizer(learning_rate=z_lr, beta1=0.9, beta2=0.999) train_step_z = optimizer_z.minimize(loss, var_list=[input_z]) optimizer_y = tf.train.AdamOptimizer(learning_rate=y_lr, beta1=0.9, beta2=0.999) train_step_y = optimizer_y.minimize(loss, var_list=[input_y]) reinit_optimizer_z = tf.variables_initializer(optimizer_z.variables()) reinit_optimizer_y = tf.variables_initializer(optimizer_y.variables()) else: z_lr = 0.001 y_lr = 0.001 loss = tf.compat.v1.losses.mean_squared_error(target, output) optimizer_z = tf.train.AdamOptimizer(learning_rate=z_lr, beta1=0.9, beta2=0.999) train_step_z = optimizer_z.minimize(loss, var_list=[input_z]) optimizer_y = tf.train.AdamOptimizer(learning_rate=y_lr, beta1=0.9, beta2=0.999) train_step_y = optimizer_y.minimize(loss, var_list=[input_y]) reinit_optimizer_z = tf.variables_initializer(optimizer_z.variables()) reinit_optimizer_y = tf.variables_initializer(optimizer_y.variables()) # train_step_z = tf.train.AdamOptimizer(z_lr).minimize(loss, var_list=[input_z], name='AdamOpterZ') # train_step_y = tf.train.AdamOptimizer(y_lr).minimize(loss, var_list=[input_y], name='AdamOpterY') target_im, fp_frames, fn_base = load_target_image(opt_fp_in, opt_video) # crop image and convert to format for next script phi_target_for_inversion = resize_and_crop_image(target_im, 512) b = np.dsplit(phi_target_for_inversion, 3) phi_target_for_inversion = np.stack(b).reshape((3, 512, 512)) # create phi target for the latent / label pass phi_target = resize_and_crop_image(target_im, opt_dims) phi_target = np.expand_dims(phi_target, 0) phi_target = np.repeat(phi_target, batch_size, axis=0) # IMPORTANT: initialize variables before running the session sess.run(tf.compat.v1.global_variables_initializer()) sess.run(tf.compat.v1.tables_initializer()) feed_dict = { target: phi_target, } try: print("Preparing to iterate...") for i in range(opt_steps): curr_loss, _, _ = sess.run([loss, train_step_z, train_step_y], feed_dict=feed_dict) if i == 0: print("Iterating!") if i % 20 == 0: print('iter: {}, loss: {}'.format(i, curr_loss)) if i > 0: if opt_stochastic_clipping and (i % opt_clip_interval) == 0: # and i < opt_steps * 0.45: sess.run(clip_latent, { clipped_alpha: 0.0 }) sess.run(reinit_optimizer_z) if opt_label_clipping and (i % opt_clip_interval) == 0: # and i < opt_steps * 0.75: # sess.run(clip_labels, { normalized_alpha: (i / opt_steps) ** 2 }) sess.run(clip_labels, { normalized_alpha: 0.0 }) sess.run(reinit_optimizer_y) if opt_video and opt_snapshot_interval != 0 and (i % opt_snapshot_interval) == 0: phi_guess = sess.run(output) guess_im = imgrid(imconvert_uint8(phi_guess), cols=1) fp_out_im = join(app_cfg.DIR_OUTPUTS, fp_frames, 'frame_{:04d}.png'.format(int(i / opt_snapshot_interval))) imwrite(fp_out_im, guess_im) except KeyboardInterrupt: pass phi_guess = sess.run(output) guess_im = imgrid(imconvert_uint8(phi_guess), cols=1) imwrite(join(app_cfg.DIR_OUTPUTS, 'frame-{}-{}-final.png'.format(opt_tag, fn_base)), guess_im) z_guess, y_guess = sess.run([input_z, input_y]) out_images[index] = phi_target_for_inversion out_labels[index] = y_guess out_latent[index] = z_guess return fp_frames def load_target_image(opt_fp_in, opt_video): print("Loading {}".format(opt_fp_in)) fn = os.path.basename(opt_fp_in) fn_base, ext = os.path.splitext(fn) fp_frames = "frames_{}_{}".format(fn_base, timestamp()) fp_frames_fullpath = join(app_cfg.DIR_OUTPUTS, fp_frames) print("Output to {}".format(fp_frames_fullpath)) if opt_video: os.makedirs(fp_frames_fullpath, exist_ok=True) target_im = imread(opt_fp_in) return target_im, fp_frames, fn_base