#!/usr/bin/env python2.7 import os from subprocess import call import sys import getopt import numpy import argparse import torch import PIL import PIL.Image from datetime import datetime from network import Network, process from moviepy.video.io.ffmpeg_reader import FFMPEG_VideoReader from moviepy.video.io.ffmpeg_writer import FFMPEG_VideoWriter ########################################################## FPS = 25 parser = argparse.ArgumentParser() parser.add_argument('--model', type=str, default='lf') # l1 or lf parser.add_argument('--first', type=str, default='./images/first.png') parser.add_argument('--second', type=str, default='./images/second.png') parser.add_argument('--out', type=str, default='./result.png') parser.add_argument('--video', action='store_true') parser.add_argument('--video-out', type=str, default=datetime.now().strftime("sepconv_%Y%m%d_%H%M.mp4")) parser.add_argument('--steps', type=int, default=0) parser.add_argument('--dilate', type=int, default=1) parser.add_argument('--smooth', action='store_true') parser.add_argument('--mix-videos', action='store_true') parser.add_argument('--average-videos', action='store_true') parser.add_argument('--a-offset', type=int, default=0) parser.add_argument('--b-offset', type=int, default=0) parser.add_argument('--padding', type=int, default=0) parser.add_argument('--endpoint', type=str, default='') parser.add_argument('--dataset', type=str, default='dataset') opt = parser.parse_args() args = vars(opt) print('------------ Options -------------') for k, v in sorted(args.items()): print('%s: %s' % (str(k), str(v))) print('-------------- End ----------------') if not os.path.exists('./renders'): os.mkdir('renders') moduleNetwork = Network(opt.model).cuda() tensorOutput = torch.FloatTensor() index = 0 def recurse_two_frames(moduleNetwork, tensorOutput, a_np, b_np, frame_index, morph_index, step, depth=0): print("generate {} {} {}".format(frame_index, morph_index, step)) tensorInputFirst = torch.FloatTensor(a_np) tensorInputSecond = torch.FloatTensor(b_np) process(moduleNetwork, tensorInputFirst, tensorInputSecond, tensorOutput) middle_np = tensorOutput.clamp(0.0, 1.0).numpy() if morph_index == frame_index: print("frame {}, depth {}".format(frame_index, depth)) return middle_np if morph_index > frame_index: next_index = morph_index - step next_a_np = a_np next_b_np = middle_np # print("next index: {} - {}".format(next_index, step)) else: next_index = morph_index + step next_a_np = middle_np next_b_np = b_np # print("next index: {} + {}".format(next_index, step)) return recurse_two_frames(moduleNetwork, tensorOutput, next_a_np, next_b_np, frame_index, next_index, step/2, depth+1) def recurse_videos(moduleNetwork, tensorOutput, a, b, a_offset, b_offset, count, step, frame_index, dilate): global index index += 1 if (index % 10) == 0: print("{}...".format(index)) step /= 2 a_fn = os.path.join(a, "frame_{:0>5}.png".format(int(frame_index + a_offset))) b_fn = os.path.join(b, "frame_{:0>5}.png".format(int(frame_index + b_offset))) print("{}".format(a_fn)) print("=> {}".format(b_fn)) if not os.path.exists(a_fn) or not os.path.exists(b_fn): return [ None ] a_np = load_image(a_fn) b_np = load_image(b_fn) img_np = recurse_two_frames(moduleNetwork, tensorOutput, a_np, b_np, frame_index, count / 2, count / 4) if step < 2 * dilate: return [ img_np ] else: left = recurse_videos(moduleNetwork, tensorOutput, a, b, a_offset, b_offset, count, step, frame_index - (step/2), dilate) right = recurse_videos(moduleNetwork, tensorOutput, a, b, a_offset, b_offset, count, step, frame_index + (step/2), dilate) return left + [ img_np ] + right def process_two_videos(moduleNetwork, tensorOutput, a, b, a_offset, b_offset, steps, dilate): steps *= dilate return recurse_videos(moduleNetwork, tensorOutput, a, b, a_offset, b_offset, steps, steps, steps/2, dilate) def average_two_videos(moduleNetwork, tensorOutput, a, b, a_offset, b_offset, steps, dilate): global index index += 1 if (index % 10) == 0: print("{}...".format(index)) frames = [] steps *= dilate for i in range(1, steps * dilate + 1, dilate): a_fn = os.path.join(a, "frame_{:0>5}.png".format(int(i + a_offset))) b_fn = os.path.join(b, "frame_{:0>5}.png".format(int(i + b_offset))) print("{}".format(a_fn)) print("=> {}".format(b_fn)) if not os.path.exists(a_fn) and not os.path.exists(b_fn): continue a_np = load_image(a_fn) b_np = load_image(b_fn) tensorInputFirst = torch.FloatTensor(a_np) tensorInputSecond = torch.FloatTensor(b_np) process(moduleNetwork, tensorInputFirst, tensorInputSecond, tensorOutput) middle_np = tensorOutput.clamp(0.0, 1.0).numpy() frames.append(middle_np) return frames def process_tree(moduleNetwork, a, b, tensorOutput, steps, dilate): global index index += 1 if (index % 10) == 0: print("{}...".format(index)) tensorInputFirst = torch.FloatTensor(a) tensorInputSecond = torch.FloatTensor(b) process(moduleNetwork, tensorInputFirst, tensorInputSecond, tensorOutput) middle_np = tensorOutput.clamp(0.0, 1.0).numpy() if steps < 4 * dilate: return [middle_np] else: left = process_tree(moduleNetwork, a, middle_np, tensorOutput, steps / 2, dilate) right = process_tree(moduleNetwork, middle_np, b, tensorOutput, steps / 2, dilate) return left + [middle_np] + right def smooth_frames(moduleNetwork, tensorOutput, frames, smooth): if not smooth: return frames print("smoothing every other frame") firstFrame = frames[0] nextFrame = None new_frames = [firstFrame] for i in range(1, len(frames)-2, 2): firstFrame = frames[i] nextFrame = frames[i+2] tensorInputFirst = torch.FloatTensor(firstFrame) tensorInputSecond = torch.FloatTensor(nextFrame) process(moduleNetwork, tensorInputFirst, tensorInputSecond, tensorOutput) middle_np = tensorOutput.clamp(0.0, 1.0).numpy() new_frames += [firstFrame] new_frames += [middle_np] if nextFrame is not None: new_frames += [nextFrame] new_frames += [frames[len(frames)-1]] return new_frames def dilate_frames(moduleNetwork, tensorOutput, frames, dilate): if dilate < 2: return frames print("dilating by a factor of {}".format(dilate)) new_frames = [] nextFrame = frames[0] for i in range(1, len(frames)): firstFrame = nextFrame nextFrame = frames[i] new_frames += [firstFrame] new_frames += process_tree(moduleNetwork, firstFrame, nextFrame, tensorOutput, dilate, 1) new_frames += [nextFrame] return new_frames def store_frames(frames, outputPath, opt, inputFirst=None, inputSecond=None): if not outputPath.endswith('.mp4'): outputPath += '.mp4' print('writing {}'.format(outputPath)) print('frames: {}'.format(len(frames))) writer = FFMPEG_VideoWriter(outputPath, (1024, 512), FPS) if inputFirst is not None: writer.write_frame(tensor_to_image(inputFirst)) if opt.padding: pad_frames(writer, opt.first, max(0, opt.a_offset - opt.padding * FPS), opt.a_offset) for frame in frames: if frame is not None: writer.write_frame(tensor_to_image(frame)) if inputSecond is not None: writer.write_frame(tensor_to_image(inputSecond)) if opt.padding: pad_frames(writer, opt.second, opt.b_offset + len(frames), opt.b_offset + len(frames) + opt.padding * FPS) writer.close() if opt.endpoint != '': call(["curl", "-X", "POST", "-F", "module=morph", "-F", "activity=morph", "-F", "generated=true", "-F", "dataset=" + opt.dataset, "-F", "datatype=video", "-F", "should_relay=true", "-F", "file=@" + outputPath, opt.endpoint ]) def pad_frames(writer, base_path, start, end): for index in range(start, end): fn = os.path.join(base_path, "frame_{:0>5}.png".format(int(index + start))) if os.path.exists(fn): print(fn) img = PIL.Image.open(fn) writer.write_frame(img) def tensor_to_image(np_val): return (numpy.rollaxis(np_val, 0, 3)[:,:,::-1] * 255.0).astype(numpy.uint8) def load_image(path): return numpy.rollaxis(numpy.asarray(PIL.Image.open(path))[:,:,::-1], 2, 0).astype(numpy.float32) / 255.0 def load_image_tensor(path): return torch.FloatTensor(load_image(path)) if opt.video and opt.video_out: reader = FFMPEG_VideoReader(opt.video, False) writer = FFMPEG_VideoWriter(opt.video_out, reader.size, reader.fps*2) reader.initialize() nextFrame = reader.read_frame() for x in range(0, reader.nframes): firstFrame = nextFrame nextFrame = reader.read_frame() tensorInputFirst = torch.FloatTensor(numpy.rollaxis(firstFrame[:,:,::-1], 2, 0) / 255.0) tensorInputSecond = torch.FloatTensor(numpy.rollaxis(nextFrame[:,:,::-1], 2, 0) / 255.0) process(moduleNetwork, tensorInputFirst, tensorInputSecond, tensorOutput) writer.write_frame(firstFrame) writer.write_frame((numpy.rollaxis(tensorOutput.clamp(0.0, 1.0).numpy(), 0, 3)[:,:,::-1] * 255.0).astype(numpy.uint8)) writer.write_frame(nextFrame) writer.close() elif opt.mix_videos: print("morph two videos...") outputPath = './renders/' + opt.video_out frames = process_two_videos(moduleNetwork, tensorOutput, opt.first, opt.second, opt.a_offset, opt.b_offset, opt.steps, opt.dilate) frames = smooth_frames(moduleNetwork, tensorOutput, frames, opt.smooth) frames = dilate_frames(moduleNetwork, tensorOutput, frames, opt.dilate) store_frames(frames, outputPath, opt) elif opt.average_videos: print("average two videos...") outputPath = './renders/' + opt.video_out frames = average_two_videos(moduleNetwork, tensorOutput, opt.first, opt.second, opt.a_offset, opt.b_offset, opt.steps, opt.dilate) frames = smooth_frames(moduleNetwork, tensorOutput, frames, opt.smooth) frames = dilate_frames(moduleNetwork, tensorOutput, frames, opt.dilate) store_frames(frames, outputPath, opt) elif opt.steps == 0: print("generate single morphed image...") tensorInputFirst = load_image_tensor(opt.first) tensorInputSecond = load_image_tensor(opt.second) process(moduleNetwork, tensorInputFirst, tensorInputSecond, tensorOutput) PIL.Image.fromarray((numpy.rollaxis(tensorOutput.clamp(0.0, 1.0).numpy(), 0, 3)[:,:,::-1] * 255.0).astype(numpy.uint8)).save(opt.out) elif opt.mix_images: print("morph two video frames...") inputFirst = load_image(os.path.join(opt.first, "frame_{:0>5}.png".format(opt.a_offset+1))) inputSecond = load_image(os.path.join(opt.second, "frame_{:0>5}.png".format(opt.b_offset+1))) outputPath = './renders/' + opt.video_out frames = process_tree(moduleNetwork, inputFirst, inputSecond, tensorOutput, opt.steps * opt.dilate, opt.dilate) frames = smooth_frames(moduleNetwork, tensorOutput, frames, opt.smooth) print("dilate... {}".format(opt.dilate)) frames = dilate_frames(moduleNetwork, tensorOutput, frames, opt.dilate) store_frames(frames, outputPath, inputFirst, inputSecond, opt) else: print("morph two images...") inputFirst = load_image(opt.first) inputSecond = load_image(opt.second) outputPath = './renders/' + opt.video_out frames = process_tree(moduleNetwork, inputFirst, inputSecond, tensorOutput, opt.steps * opt.dilate, opt.dilate) frames = smooth_frames(moduleNetwork, tensorOutput, frames, opt.smooth) print("dilate... {}".format(opt.dilate)) frames = dilate_frames(moduleNetwork, tensorOutput, frames, opt.dilate) store_frames(frames, outputPath, opt, inputFirst, inputSecond)