import os os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' import sys import glob import h5py import numpy as np import json import tensorflow as tf import tensorflow_probability as tfp import tensorflow_hub as hub import time import random from scipy.stats import truncnorm from PIL import Image from urllib.parse import parse_qs import app.search.visualize as vs from app.search.json import params_dense_dict from app.utils.file_utils import load_pickle from app.settings import app_cfg tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), '../../../../live-cortex/rpc/')) from rpc import CortexRPC from app.search.params import timestamp from app.utils.cortex_utils import results_folder, upload_file_to_cortex from app.utils.tf_utils import read_checkpoint from subprocess import Popen, PIPE import easing_functions as easing # frames per second FPS = 25 # amount to smooth manual parameter changes. set to 1 to disable smoothing SMOOTH_AMOUNT = 6 params = params_dense_dict('live') # -------------------------- # Make directories. # -------------------------- tag = "test" OUTPUT_DIR = os.path.join('output', tag) if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR) # -------------------------- # Load Graph. # -------------------------- print("Loading module...") generator = hub.Module(str(params.generator_path)) print("Loaded!") gen_signature = 'generator' if 'generator' not in generator.get_signature_names(): gen_signature = 'default' input_info = generator.get_input_info_dict(gen_signature) BATCH_SIZE = 1 Z_DIM = input_info['z'].get_shape().as_list()[1] N_CLASS = input_info['y'].get_shape().as_list()[1] # -------------------------- # Utils # -------------------------- def clamp(n, a=0, b=1): if n < a: return a if n > b: return b return n # -------------------------- # Initializers # -------------------------- def label_sampler(num_classes=1, shape=(BATCH_SIZE, N_CLASS,)): label = np.zeros(shape) for i in range(shape[0]): for _ in range(int(num_classes)): j = random.randint(0, shape[1]-1) label[i, j] = random.random() label[i] /= label[i].sum() return label def truncated_z_sample(shape=(BATCH_SIZE, Z_DIM,), truncation=1.0): values = truncnorm.rvs(-2, 2, size=shape) return truncation * values def normal_z_sample(shape=(BATCH_SIZE, Z_DIM,)): return np.random.normal(size=shape) # -------------------------- # More complex ops # -------------------------- class SinParam: def __init__(self, name, shape, datatype="noise", lerp=True, radius=0.25): orbit_radius = InterpolatorParam(name=name + '_radius', value=radius, smooth=True) orbit_speed = InterpolatorParam(name=name + '_speed', value=FPS, smooth=True) orbit_time = InterpolatorParam(name=name + '_time', value=0.0) if lerp: noise = LerpParam(name + '_noise', shape=shape, datatype=datatype, ease=easing.CubicEaseInOut) noise_out = noise.output else: noise = InterpolatorParam(name + '_noise_a', shape=shape, datatype=datatype) noise_out = noise.variable sin = tf.math.sin(orbit_time.variable + noise_out) * orbit_radius.variable cos = tf.math.cos(orbit_time.variable + noise_out) * orbit_radius.variable output = sin + cos interpolator.sin_params[name] = self self.name = name self.orbit_speed = orbit_speed self.orbit_time = orbit_time self.output = output self.noise = noise self.lerp = lerp self.t = 0 def update(self): self.orbit_time.assign(self.orbit_time.value + (np.pi) / self.orbit_speed.value, immediate=True) self.t += 1 # randomize the orbit when possible - # - check if we've done one full orbital period # - check if the noise is done transitioning if self.lerp and self.t >= (self.noise.speed.value + self.orbit_speed.value) and self.noise.n.value == 0 or self.noise.n.value == 1: self.noise.switch() self.t = 0 class LerpParam: def __init__(self, name, shape, a_in=None, b_in=None, datatype="noise", ease=easing.QuadEaseInOut): if a_in is not None and b_in is not None: a = InterpolatorVariable(variable=a_in) b = InterpolatorVariable(variable=b_in) else: a = InterpolatorParam(name=name + '_a', shape=shape, datatype=datatype) b = InterpolatorParam(name=name + '_b', shape=shape, datatype=datatype) n = InterpolatorParam(name=name + '_n', value=0.0, smooth=True) t = InterpolatorParam(name=name + '_t', value=0.0, smooth=False) speed = InterpolatorParam(name=name + '_speed', value=FPS, smooth=True) output = a.variable * (1 - n.variable) + b.variable * n.variable interpolator.lerp_params[name] = self self.name = name self.a = a self.b = b self.n = n self.t = t self.ease = ease(start=0, end=1, duration=1) self.speed = speed self.output = output self.direction = 0 def switch(self, target_value=None): self.t.value = self.n.value if self.t.value > 0.5: target_param = self.a self.direction = -1 else: target_param = self.b self.direction = 1 if target_value is None: target_param.randomize() else: target_param.assign(target_value) def update(self): if self.direction != 0: self.t.value = clamp(self.t.value + self.direction / self.speed.value) self.n.assign(self.ease.ease(self.t.value), immediate=True) print("set_opt: {}_n {}".format(self.name, self.n.value)) if self.n.value == 0 or self.n.value == 1: self.direction = 0 # -------------------------- # Placeholder params # -------------------------- class InterpolatorParam: def __init__(self, name, dtype=tf.float32, shape=(), value=None, datatype="float", smooth=False): self.scalar = shape == () self.shape = shape self.datatype = datatype self.smooth = smooth if datatype == "float": self.assign(value or 0.0, immediate=True) if self.smooth: interpolator.smooth_params[name] = self else: self.randomize() self.variable = tf.placeholder(dtype=dtype, shape=shape) interpolator.opts[name] = self def assign(self, value, immediate=False): if self.datatype == 'float': value = float(value) self.next_value = value if immediate or not self.smooth: self.value = value else: self.value = value def update(self): self.value = (self.value * (SMOOTH_AMOUNT - 1) + self.next_value) / (SMOOTH_AMOUNT) def randomize(self): if self.datatype == 'noise': val = truncated_z_sample(shape=self.shape, truncation=interpolator.opts['truncation'].value) elif self.datatype == 'label': val = label_sampler(shape=self.shape, num_classes=interpolator.opts['num_classes'].value) elif self.datatype == 'encoding': val = np.zeros(self.shape) else: val = 0.0 self.assign(val) class InterpolatorVariable: def __init__(self, variable, smooth=False): self.scalar = False self.variable = variable self.smooth = smooth def assign(self): pass def randomize(self): pass # -------------------------- # Interpolator graph # -------------------------- class Interpolator: def __init__(self): self.paused = False self.stopped = False self.opts = {} self.sin_params = {} self.lerp_params = {} self.smooth_params = {} self.load_disentangled_latents() def build(self): InterpolatorParam(name='truncation', value=1.0) InterpolatorParam(name='num_classes', value=1.0) # Latent - initial lerp and wobble lerp_z = LerpParam('latent', shape=[BATCH_SIZE, Z_DIM], datatype="noise") sin_z = SinParam('orbit', shape=[BATCH_SIZE, Z_DIM], datatype="noise") z_sum = lerp_z.output + sin_z.output # Latent - saturation saturation = InterpolatorParam(name='saturation', value=1.0, smooth=True) z_abs = z_sum / tf.abs(z_sum) * saturation.variable z_mix = LerpParam('saturation_mix', a_in=z_sum, b_in=z_abs, shape=[BATCH_SIZE, Z_DIM], datatype="input") # Latent - disentangled vectors zoom = InterpolatorParam(name='zoom', value=0.0, smooth=True).variable * self.disentangled['zoom'] * -1 shiftx = InterpolatorParam(name='shiftx', value=0.0, smooth=True).variable * self.disentangled['shiftx'] shifty = InterpolatorParam(name='shifty', value=0.0, smooth=True).variable * self.disentangled['shifty'] luminance = InterpolatorParam(name='luminance', value=0.0, smooth=True).variable * self.disentangled['luminance'] disentangled = z_mix.output + zoom + shiftx + shifty + luminance # Latent - stored vector # latent_stored = InterpolatorParam(name='latent_stored', shape=[BATCH_SIZE, Z_DIM], datatype="noise") latent_stored = LerpParam(name='latent_stored', shape=[BATCH_SIZE, Z_DIM], datatype="noise") latent_stored_mix = LerpParam('latent_stored_mix', a_in=disentangled, b_in=latent_stored.output, shape=[BATCH_SIZE, Z_DIM], datatype="input") # Label lerp_label = LerpParam('label', shape=[BATCH_SIZE, N_CLASS], datatype="label") # Latent - stored vector - now that we have deep labels, we don't need this # label_stored = InterpolatorParam(name='label_stored', shape=[BATCH_SIZE, N_CLASS], datatype="label") # label_stored = LerpParam(name='label_stored', shape=[BATCH_SIZE, N_CLASS], datatype="label") # label_stored_mix = LerpParam('label_stored_mix', a_in=lerp_label.output, b_in=label_stored.output, shape=[BATCH_SIZE, Z_DIM], datatype="input") # gen_in['y'] = label_stored_mix.output # Generator gen_in = {} gen_in['truncation'] = 1.0 # self.opts['truncation'].variable gen_in['z'] = latent_stored_mix.output gen_in['y'] = lerp_label.output self.gen_img = generator(gen_in, signature=gen_signature) # Encoding - first hidden layer gen_layer_name = 'module_apply_{}/{}'.format(gen_signature, params.inv_layer) encoding_latent = tf.get_default_graph().get_tensor_by_name(gen_layer_name) encoding_shape = encoding_latent.get_shape().as_list() encoding_shape_np = tuple([1,] + encoding_shape[1:]) encoding_latent_placeholder = tf.constant(np.zeros(encoding_shape_np, dtype=np.float32)) encoding_stored = LerpParam('encoding_stored', shape=encoding_shape_np, datatype="noise") encoding_stored_sin = SinParam('encoding_orbit', shape=encoding_shape_np, datatype="noise", radius=0.02) encoding_stored_sum = encoding_stored.output + encoding_stored_sin.output encoding_stored_mix = LerpParam('encoding_stored_mix', a_in=encoding_latent_placeholder, b_in=encoding_stored_sum, shape=encoding_shape_np, datatype="encoding") # Use the placeholder to redirect parts of the graph. # - computed encoding goes into the encoding_mix # - encoding mix output goes into the main biggan graph # We do it this way so the encoding_latent won't be going into two places at once. tf.contrib.graph_editor.swap_ts(encoding_latent_placeholder, encoding_latent) tf.contrib.graph_editor.swap_ts(encoding_stored_mix.output, encoding_latent_placeholder) # Make all the stored lerps use the same interpolation amount. tf.contrib.graph_editor.reroute_ts(encoding_stored.n.variable, latent_stored.n.variable) # tf.contrib.graph_editor.reroute_ts(encoding_stored.n.variable, label_stored.n.variable) tf.contrib.graph_editor.reroute_ts(encoding_stored_mix.n.variable, latent_stored_mix.n.variable) # tf.contrib.graph_editor.reroute_ts(encoding_stored_mix.n.variable, label_stored_mix.n.variable) layer_op_names = [ "Generator_2/concat", "Generator_2/concat_1", "Generator_2/concat_2", "Generator_2/concat_3", "Generator_2/concat_4", "Generator_2/concat_5", "Generator_2/concat_6", ] op_input_index = 1 first_mix = encoding_stored_mix # first_mix = None # split_shape = [1, 128] for op_name, layer_id in zip(layer_op_names, range(7)): tensor_name = 'module_apply_{}/linear_1/MatMul:0'.format(gen_signature) layer_tensor = tf.get_default_graph().get_tensor_by_name(tensor_name) op_name = 'module_apply_{}/{}'.format(gen_signature, op_name) layer_op = tf.get_default_graph().get_operation_by_name(op_name) ## unlike the encoding, here we update the layer op directly, so we don't need a temporary constant to swap # raw_layer_placeholder = tf.constant(np.zeros(layer_tensor.shape, dtype=np.float32)) layer_stored = LerpParam('layer_stored_{}'.format(layer_id), shape=layer_tensor.shape, datatype="noise") layer_stored_mix = LerpParam('layer_stored_mix_{}'.format(layer_id), a_in=layer_tensor, b_in=layer_stored.output, shape=layer_tensor.shape, datatype="noise") layer_op._update_input(op_input_index, layer_stored_mix.output) # Make all the stored lerps use the same interpolation amount. if first_mix is None: first_mix = layer_stored_mix else: tf.contrib.graph_editor.reroute_ts(first_mix.n.variable, layer_stored_mix.n.variable) sys.stderr.write("Sin params: {}\n".format(", ".join(self.sin_params.keys()))) sys.stderr.write("Lerp params: {}\n".format(", ".join(self.lerp_params.keys()))) sys.stderr.write("Smooth params: {}\n".format(", ".join(self.smooth_params.keys()))) sys.stderr.write("Opts: {}\n".format(", ".join(self.opts.keys()))) def load_disentangled_latents(self): self.disentangled = { 'zoom': read_checkpoint(os.path.join(app_cfg.DIR_DISENTANGLED, 'zoom/model.ckpt'), 'walk')[:, :, 0], 'shiftx': read_checkpoint(os.path.join(app_cfg.DIR_DISENTANGLED, 'shiftx/model.ckpt'), 'walk')[:, :, 0], 'shifty': read_checkpoint(os.path.join(app_cfg.DIR_DISENTANGLED, 'shifty/model.ckpt'), 'walk')[:, :, 0], 'rotate2d': read_checkpoint(os.path.join(app_cfg.DIR_DISENTANGLED, 'rotate2d/model.ckpt'), 'walk')[:, :, 0], 'rotate3d': read_checkpoint(os.path.join(app_cfg.DIR_DISENTANGLED, 'rotate3d/model.ckpt'), 'walk')[:, :, 0], } disentangled_color = read_checkpoint(os.path.join(app_cfg.DIR_DISENTANGLED, 'color/model.ckpt'), 'walk') self.disentangled['r'] = disentangled_color[:, :, 0] self.disentangled['g'] = disentangled_color[:, :, 1] self.disentangled['b'] = disentangled_color[:, :, 2] self.disentangled['luminance'] = np.sum(disentangled_color, axis=2) def get_feed_dict(self): opt = {} for param in self.opts.values(): opt[param.variable] = param.value return opt def get_state(self): opt = {} for key, param in self.opts.items(): if param.scalar: if type(param.value) is np.ndarray: sys.stderr.write('{} is ndarray\n'.format(key)) opt[key] = param.value.tolist() else: opt[key] = param.value opt['paused'] = self.paused return opt def set_value(self, key, value): if key in self.opts: self.opts[key].assign(float(value)) else: sys.stderr.write('{} not a valid option\n'.format(key)) def set_category(self, category): print("Set category: {}".format(category)) categories = category.split(" ") label = np.zeros((BATCH_SIZE, N_CLASS,)) for category in categories: index = int(category) if index > 0 and index < N_CLASS: label[0, index] = 1.0 label[0] /= label[0].sum() self.lerp_params['label'].switch(target_value=label) def set_encoding(self, opt): next_id = opt['id'] data = load_pickle(os.path.join(app_cfg.DIR_VECTORS, "file_{}.pkl".format(next_id))) new_latent = np.expand_dims(data['latent'], axis=0) new_label = np.expand_dims(data['label'], axis=0) new_encoding = np.expand_dims(data['encoding'], axis=0) latent_stored = self.lerp_params['latent_stored'] # label_stored = self.lerp_params['label_stored'] encoding_stored = self.lerp_params['encoding_stored'] encoding_stored_mix = self.lerp_params['encoding_stored_mix'] layer_label_count = len(data['layer_labels']) layer_labels_stored = [] new_layer_labels = [] for i in range(layer_label_count): layer_labels_stored.append( self.lerp_params['layer_stored_{}'.format(i)] ) new_layer_labels.append( np.expand_dims(data['layer_labels'][i], axis=0) ) # if we're showing an encoding already, lerp to the next one if encoding_stored_mix.n.value > 0: encoding_stored.switch(target_value=new_encoding) # label_stored.switch(target_value=new_label) latent_stored.switch(target_value=new_latent) for layer, value in zip(layer_labels_stored, new_layer_labels): layer.switch(target_value=value) # otherwise (we're showing the latent)... else: # jump to the stored encoding, then switch if encoding_stored.n.value < 0.5: encoding_stored.n.assign(0) encoding_stored.a.assign(new_encoding) latent_stored.a.assign(new_latent) # label_stored.a.assign(new_label) for layer, value in zip(layer_labels_stored, new_layer_labels): layer.a.assign(value) else: encoding_stored.n.assign(1) encoding_stored.b.assign(new_encoding) latent_stored.b.assign(new_latent) # label_stored.b.assign(new_label) for layer, value in zip(layer_labels_stored, new_layer_labels): layer.b.assign(value) encoding_stored_mix.switch() def on_step(self, i, sess): for param in self.sin_params.values(): param.update() for param in self.lerp_params.values(): param.update() for param in self.smooth_params.values(): param.update() gen_images = sess.run(self.gen_img, feed_dict=self.get_feed_dict()) return gen_images def run(self, cmd, payload): print("Command: {} {}".format(cmd, payload)) if cmd == 'switch' and payload in self.lerp_params: self.lerp_params[payload].switch() if cmd == 'setCategory': self.set_category(payload) if cmd == 'setEncoding': self.set_encoding(json.loads(payload)) if cmd == 'play': print("set_opt: paused True") self.paused = False if cmd == 'pause': print("set_opt: paused False") self.paused = True if cmd == 'stop': self.stopped = True pass # -------------------------- # RPC Listener # -------------------------- interpolator = Interpolator() class Listener: def connect(self): self.rpc_client = CortexRPC(self.on_get, self.on_set, self.on_ready, self.on_cmd) def on_set(self, key, value): print("{}: {} {}".format(key, str(type(value)), value)) interpolator.set_value(key, value) def on_get(self): state = interpolator.get_state() # sys.stderr.write(json.dumps(state) + "\n") # sys.stderr.flush() for key in state.keys(): print("set_opt: {} {}".format(key, state[key])) return state def on_cmd(self, cmd, payload): print("got command {}".format(cmd)) interpolator.run(cmd, payload) def on_ready(self, rpc_client): self.rpc_client = rpc_client print("Starting session...") self.sess = tf.Session(config=tf.ConfigProto(allow_soft_placement=True)) self.sess.run(tf.global_variables_initializer()) self.sess.run(tf.tables_initializer()) print("Building interpolator...") interpolator.build() self.rpc_client.send_status('processing', True) self.on_get() tag = "biggan_" + timestamp() # path_out = os.path.join(app_cfg.DIR_RESULTS, tag) # os.makedirs(path_out, exist_ok=True) fp_out = os.path.join(app_cfg.DIR_RENDERS, '{}.mp4'.format(tag)) pipe = Popen([ 'ffmpeg', '-hide_banner', '-y', '-f', 'image2pipe', '-vcodec', 'png', '-r', str(FPS), '-i', '-', '-c:v', 'libx264', '-preset', 'slow', '-crf', '19', '-vf', 'fps={}'.format(FPS), '-pix_fmt', 'yuv420p', '-s', '512x512', '-r', str(FPS), fp_out ], stdin=PIPE, stdout=PIPE) self.run(interpolator, pipe) self.rpc_client.send_status('processing', False) self.sess.close() print("Writing video...") pipe.stdin.close() pipe.wait() print("Uploading video...") folder = results_folder() data = upload_file_to_cortex(folder['id'], fp_out, datatype='video', activity='live') # print(json.dumps(data, indent=2)) print("Done!") def run(self, interpolator, pipe): gen_time_total = 0 to_pil_time_total = 0 save_time_total = 0 send_time_total = 0 for i in range(99999): if i == 0: print("Loading network...") elif i == 1: print("Processing!") elif interpolator.paused: time.sleep(0.5) continue elif interpolator.stopped: print("Stopping...") return gen_time = time.time() gen_images = interpolator.on_step(i, self.sess) if i == 0: continue gen_time_total += time.time() - gen_time if gen_images is None: print("Exiting...") break to_pil_time = time.time() out_img = vs.data2pil(gen_images[0]) to_pil_time_total += time.time() - to_pil_time if out_img is None: print("Got None instead of an image...?") return save_time = time.time() # out_img.save(os.path.join(path_out, "frame_{:05d}.png".format(i)), format='png', compression_level=3) out_img.save(pipe.stdin, format='png', compression_level=3) printed = False while True: line = pipe.stdout.readline() if not line: break elif not printed: printed = True print("Waiting for FFMPEG to catch up...") save_time_total += time.time() - save_time img_to_send = out_img.resize((256, 256), Image.BICUBIC) meta = { 'i': i, 'sequence_i': i, 'skip_i': 0, 'sequence_len': 99999, } send_time = time.time() self.rpc_client.send_pil_image("frame_{:05d}.png".format(i+1), meta, img_to_send, 'jpg') send_time_total += time.time() - send_time if (i % 100) == 0 or i == 1: print("step: {}, gen: {:.2f}, pil: {:.2f}, save: {:.2f}, send: {:.2f}".format(i, gen_time_total / i, to_pil_time_total / i, save_time_total / i, send_time_total / i))