#!/usr/bin/env python2.7 import os import sys import getopt import numpy import torch import PIL import PIL.Image from datetime import datetime from network import Network, process from moviepy.video.io.ffmpeg_reader import FFMPEG_VideoReader from moviepy.video.io.ffmpeg_writer import FFMPEG_VideoWriter ########################################################## arguments_strModel = 'lf' arguments_strFirst = './images/first.png' arguments_strSecond = './images/second.png' arguments_strOut = './result.png' arguments_strVideo = False arguments_strVideoOut = datetime.now().strftime("sepconv_%Y%m%d_%H%M.mp4") arguments_steps = 0 arguments_dilate = 1 arguments_smooth = False arguments_aOffset = 0 arguments_bOffset = 0 arguments_mixVideos = False arguments_averageVideos = False arguments_endpoint = None arguments_dataset = "dataset" for strOption, strArgument in getopt.getopt(sys.argv[1:], '', [ strParameter[2:] + '=' for strParameter in sys.argv[1::2] ])[0]: print("{}: {}".format(strOption, strArgument)) if strOption == '--model': arguments_strModel = strArgument # which model to use, l1 or lf, please see our paper for more details elif strOption == '--first': arguments_strFirst = strArgument # path to the first frame elif strOption == '--second': arguments_strSecond = strArgument # path to the second frame elif strOption == '--out': arguments_strOut = strArgument # path to where the output should be stored elif strOption == '--video': arguments_strVideoOut = strArgument # path to video elif strOption == '--video-out': arguments_strVideoOut = strArgument # path to where the video should be stored elif strOption == '--steps': arguments_steps = int(strArgument) elif strOption == '--dilate': arguments_dilate = int(strArgument) elif strOption == '--smooth': arguments_smooth = bool(strArgument) elif strOption == '--mix-videos': arguments_mixVideos = bool(strArgument) elif strOption == '--average-videos': arguments_averageVideos = bool(strArgument) elif strOption == '--a-offset': arguments_aOffset = int(strArgument) elif strOption == '--b-offset': arguments_bOffset = int(strArgument) elif strOption == '--endpoint': arguments_endpoint = strArgument elif strOption == '--dataset': arguments_dataset = strArgument if not os.path.exists('./renders'): os.mkdir('renders') moduleNetwork = Network(arguments_strModel).cuda() tensorOutput = torch.FloatTensor() index = 0 def recurse_two_frames(moduleNetwork, tensorOutput, a_np, b_np, frame_index, morph_index, step, depth=0): print("generate {} {} {}".format(frame_index, morph_index, step)) tensorInputFirst = torch.FloatTensor(a_np) tensorInputSecond = torch.FloatTensor(b_np) process(moduleNetwork, tensorInputFirst, tensorInputSecond, tensorOutput) middle_np = tensorOutput.clamp(0.0, 1.0).numpy() if morph_index == frame_index: print("frame {}, depth {}".format(frame_index, depth)) return middle_np if morph_index > frame_index: next_index = morph_index - step next_a_np = a_np next_b_np = middle_np # print("next index: {} - {}".format(next_index, step)) else: next_index = morph_index + step next_a_np = middle_np next_b_np = b_np # print("next index: {} + {}".format(next_index, step)) return recurse_two_frames(moduleNetwork, tensorOutput, next_a_np, next_b_np, frame_index, next_index, step/2, depth+1) def recurse_videos(moduleNetwork, tensorOutput, a, b, a_offset, b_offset, count, step, frame_index, dilate): global index index += 1 if (index % 10) == 0: print("{}...".format(index)) step /= 2 a_fn = os.path.join(a, "frame_{:0>5}.png".format(int(frame_index + a_offset))) b_fn = os.path.join(b, "frame_{:0>5}.png".format(int(frame_index + b_offset))) print("{} => {}".format(a_fn, b_fn)) a_np = load_image(a_fn) b_np = load_image(b_fn) img_np = recurse_two_frames(moduleNetwork, tensorOutput, a_np, b_np, frame_index, count / 2, count / 4) if step < 2 * dilate: return [img_np] else: left = recurse_videos(moduleNetwork, tensorOutput, a, b, a_offset, b_offset, count, step, frame_index - (step/2), dilate) right = recurse_videos(moduleNetwork, tensorOutput, a, b, a_offset, b_offset, count, step, frame_index + (step/2), dilate) return left + [img_np] + right def process_two_videos(moduleNetwork, tensorOutput, a, b, a_offset, b_offset, steps, dilate): steps *= dilate return recurse_videos(moduleNetwork, tensorOutput, a, b, a_offset, b_offset, steps, steps, steps/2, dilate) def average_two_videos(moduleNetwork, tensorOutput, a, b, a_offset, b_offset, steps, dilate): global index index += 1 if (index % 10) == 0: print("{}...".format(index)) frames = [] steps *= dilate for i in range(1, steps * dilate + 1, dilate): a_fn = os.path.join(a, "frame_{:0>5}.png".format(int(i + a_offset))) b_fn = os.path.join(b, "frame_{:0>5}.png".format(int(i + b_offset))) print("{} => {}".format(a_fn, b_fn)) a_np = load_image(a_fn) b_np = load_image(b_fn) tensorInputFirst = torch.FloatTensor(a_np) tensorInputSecond = torch.FloatTensor(b_np) process(moduleNetwork, tensorInputFirst, tensorInputSecond, tensorOutput) middle_np = tensorOutput.clamp(0.0, 1.0).numpy() frames.append(middle_np) return frames def process_tree(moduleNetwork, a, b, tensorOutput, steps, dilate): global index index += 1 if (index % 10) == 0: print("{}...".format(index)) tensorInputFirst = torch.FloatTensor(a) tensorInputSecond = torch.FloatTensor(b) process(moduleNetwork, tensorInputFirst, tensorInputSecond, tensorOutput) middle_np = tensorOutput.clamp(0.0, 1.0).numpy() if steps < 4 * dilate: return [middle_np] else: left = process_tree(moduleNetwork, a, middle_np, tensorOutput, steps / 2, dilate) right = process_tree(moduleNetwork, middle_np, b, tensorOutput, steps / 2, dilate) return left + [middle_np] + right def smooth_frames(moduleNetwork, tensorOutput, frames, smooth): if not smooth: return frames print("smoothing every other frame") firstFrame = frames[0] new_frames = [firstFrame] for i in range(1, len(frames)-2, 2): firstFrame = frames[i] nextFrame = frames[i+2] tensorInputFirst = torch.FloatTensor(firstFrame) tensorInputSecond = torch.FloatTensor(nextFrame) process(moduleNetwork, tensorInputFirst, tensorInputSecond, tensorOutput) middle_np = tensorOutput.clamp(0.0, 1.0).numpy() new_frames += [firstFrame] new_frames += [middle_np] new_frames += [nextFrame] new_frames += [frames[len(frames)-1]] return new_frames def dilate_frames(moduleNetwork, tensorOutput, frames, dilate): if dilate < 2: return frames print("dilating by a factor of {}".format(dilate)) new_frames = [] nextFrame = frames[0] for i in range(1, len(frames)): firstFrame = nextFrame nextFrame = frames[i] new_frames += [firstFrame] new_frames += process_tree(moduleNetwork, firstFrame, nextFrame, tensorOutput, dilate, 1) new_frames += [nextFrame] return new_frames def store_frames(frames, outputPath, inputFirst=None, inputSecond=None, endpoint=None, dataset="unknown"): print('writing {}'.format(outputPath)) print('frames: {}'.format(len(frames))) writer = FFMPEG_VideoWriter(outputPath, (1024, 512), 25) if inputFirst is not None: writer.write_frame(inputFirst) for frame in frames: writer.write_frame(tensor_to_image(frame)) if inputSecond is not None: writer.write_frame(inputSecond) writer.close() if endpoint is not None: os.system(["curl", "-X", "POST", "-F", "module=morph", "-F", "activity=morph", "-F", "generated=true", "-F", "dataset=" + dataset, "-F", "datatype=video", "-F", "file=@" + outputPath, endpoint ]); def tensor_to_image(np_val): return (numpy.rollaxis(np_val, 0, 3)[:,:,::-1] * 255.0).astype(numpy.uint8) def load_image(path): return numpy.rollaxis(numpy.asarray(PIL.Image.open(path))[:,:,::-1], 2, 0).astype(numpy.float32) / 255.0 def load_image_tensor(path): return torch.FloatTensor(load_image(path)) if arguments_strVideo and arguments_strVideoOut: reader = FFMPEG_VideoReader(arguments_strVideo, False) writer = FFMPEG_VideoWriter(arguments_strVideoOut, reader.size, reader.fps*2) reader.initialize() nextFrame = reader.read_frame() for x in range(0, reader.nframes): firstFrame = nextFrame nextFrame = reader.read_frame() tensorInputFirst = torch.FloatTensor(numpy.rollaxis(firstFrame[:,:,::-1], 2, 0) / 255.0) tensorInputSecond = torch.FloatTensor(numpy.rollaxis(nextFrame[:,:,::-1], 2, 0) / 255.0) process(moduleNetwork, tensorInputFirst, tensorInputSecond, tensorOutput) writer.write_frame(firstFrame) writer.write_frame((numpy.rollaxis(tensorOutput.clamp(0.0, 1.0).numpy(), 0, 3)[:,:,::-1] * 255.0).astype(numpy.uint8)) writer.write_frame(nextFrame) writer.close() elif arguments_mixVideos: # Morph two videos print("morph two videos...") print("{} => {}".format(arguments_strFirst, arguments_strSecond)) outputPath = './renders/' + arguments_strVideoOut frames = process_two_videos(moduleNetwork, tensorOutput, arguments_strFirst, arguments_strSecond, arguments_aOffset, arguments_bOffset, arguments_steps, arguments_dilate) frames = smooth_frames(moduleNetwork, tensorOutput, frames, arguments_smooth) frames = dilate_frames(moduleNetwork, tensorOutput, frames, arguments_dilate) store_frames(frames, outputPath, endpoint=arguments_endpoint, dataset=arguments_dataset) elif arguments_averageVideos: print("average two videos...") print("{} => {}".format(arguments_strFirst, arguments_strSecond)) outputPath = './renders/' + arguments_strVideoOut frames = average_two_videos(moduleNetwork, tensorOutput, arguments_strFirst, arguments_strSecond, arguments_aOffset, arguments_bOffset, arguments_steps, arguments_dilate) frames = smooth_frames(moduleNetwork, tensorOutput, frames, arguments_smooth) frames = dilate_frames(moduleNetwork, tensorOutput, frames, arguments_dilate) store_frames(frames, outputPath, endpoint=arguments_endpoint, dataset=arguments_dataset) elif arguments_steps == 0: # Process image tensorInputFirst = load_image_tensor(arguments_strFirst) tensorInputSecond = load_image_tensor(arguments_strSecond) process(moduleNetwork, tensorInputFirst, tensorInputSecond, tensorOutput) PIL.Image.fromarray((numpy.rollaxis(tensorOutput.clamp(0.0, 1.0).numpy(), 0, 3)[:,:,::-1] * 255.0).astype(numpy.uint8)).save(arguments_strOut) else: # Morph two images print("{} => {}".format(arguments_strFirst, arguments_strSecond)) inputFirst = load_image(arguments_strFirst) inputSecond = load_image(arguments_strSecond) outputPath = './renders/' + arguments_strVideoOut frames = process_tree(moduleNetwork, inputFirst, inputSecond, tensorOutput, arguments_steps * arguments_dilate, arguments_dilate) frames = smooth_frames(moduleNetwork, tensorOutput, frames, arguments_smooth) print("dilate... {}".format(arguments_dilate)) frames = dilate_frames(moduleNetwork, tensorOutput, frames, arguments_dilate) store_frames(frames, outputPath, inputFirst, inputSecond, endpoint=arguments_endpoint, dataset=arguments_dataset)