summaryrefslogtreecommitdiff
path: root/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'utils.py')
-rw-r--r--utils.py229
1 files changed, 229 insertions, 0 deletions
diff --git a/utils.py b/utils.py
new file mode 100644
index 0000000..26837cd
--- /dev/null
+++ b/utils.py
@@ -0,0 +1,229 @@
+# -*- coding: utf-8 -*-
+import numpy as np
+import tensorflow as tf
+import h5py
+from PIL import Image
+
+def LoadImage(path, color_mode='RGB', channel_mean=None, modcrop=[0,0,0,0]):
+ '''Load an image using PIL and convert it into specified color space,
+ and return it as an numpy array.
+
+ https://github.com/fchollet/keras/blob/master/keras/preprocessing/image.py
+ The code is modified from Keras.preprocessing.image.load_img, img_to_array.
+ '''
+ ## Load image
+ img = Image.open(path)
+ if color_mode == 'RGB':
+ cimg = img.convert('RGB')
+ x = np.asarray(cimg, dtype='float32')
+
+ elif color_mode == 'YCbCr' or color_mode == 'Y':
+ cimg = img.convert('YCbCr')
+ x = np.asarray(cimg, dtype='float32')
+ if color_mode == 'Y':
+ x = x[:,:,0:1]
+
+ ## To 0-1
+ x *= 1.0/255.0
+
+ if channel_mean:
+ x[:,:,0] -= channel_mean[0]
+ x[:,:,1] -= channel_mean[1]
+ x[:,:,2] -= channel_mean[2]
+
+ if modcrop[0]*modcrop[1]*modcrop[2]*modcrop[3]:
+ x = x[modcrop[0]:-modcrop[1], modcrop[2]:-modcrop[3], :]
+
+ return x
+
+def DownSample(x, h, scale=4):
+ ds_x = tf.shape(x)
+ x = tf.reshape(x, [ds_x[0]*ds_x[1], ds_x[2], ds_x[3], 3])
+
+ # Reflect padding
+ W = tf.constant(h)
+
+ filter_height, filter_width = 13, 13
+ pad_height = filter_height - 1
+ pad_width = filter_width - 1
+
+ # When pad_height (pad_width) is odd, we pad more to bottom (right),
+ # following the same convention as conv2d().
+ pad_top = pad_height // 2
+ pad_bottom = pad_height - pad_top
+ pad_left = pad_width // 2
+ pad_right = pad_width - pad_left
+ pad_array = [[0,0], [pad_top, pad_bottom], [pad_left, pad_right], [0,0]]
+
+ depthwise_F = tf.tile(W, [1, 1, 3, 1])
+ y = tf.nn.depthwise_conv2d(tf.pad(x, pad_array, mode='REFLECT'), depthwise_F, [1, scale, scale, 1], 'VALID')
+
+ ds_y = tf.shape(y)
+ y = tf.reshape(y, [ds_x[0], ds_x[1], ds_y[1], ds_y[2], 3])
+ return y
+
+def _rgb2ycbcr(img, maxVal=255):
+ O = np.array([[16],
+ [128],
+ [128]])
+ T = np.array([[0.256788235294118, 0.504129411764706, 0.097905882352941],
+ [-0.148223529411765, -0.290992156862745, 0.439215686274510],
+ [0.439215686274510, -0.367788235294118, -0.071427450980392]])
+
+ if maxVal == 1:
+ O = O / 255.0
+
+ t = np.reshape(img, (img.shape[0]*img.shape[1], img.shape[2]))
+ t = np.dot(t, np.transpose(T))
+ t[:, 0] += O[0]
+ t[:, 1] += O[1]
+ t[:, 2] += O[2]
+ ycbcr = np.reshape(t, [img.shape[0], img.shape[1], img.shape[2]])
+
+ return ycbcr
+
+def to_uint8(x, vmin, vmax):
+ x = x.astype('float32')
+ x = (x-vmin)/(vmax-vmin)*255 # 0~255
+ return np.clip(np.round(x), 0, 255)
+
+def AVG_PSNR(vid_true, vid_pred, vmin=0, vmax=255, t_border=2, sp_border=8, is_T_Y=False, is_P_Y=False):
+ '''
+ This include RGB2ycbcr and VPSNR computed in Y
+ '''
+ input_shape = vid_pred.shape
+ if is_T_Y:
+ Y_true = to_uint8(vid_true, vmin, vmax)
+ else:
+ Y_true = np.empty(input_shape[:-1])
+ for t in range(input_shape[0]):
+ Y_true[t] = _rgb2ycbcr(to_uint8(vid_true[t], vmin, vmax), 255)[:,:,0]
+
+ if is_P_Y:
+ Y_pred = to_uint8(vid_pred, vmin, vmax)
+ else:
+ Y_pred = np.empty(input_shape[:-1])
+ for t in range(input_shape[0]):
+ Y_pred[t] = _rgb2ycbcr(to_uint8(vid_pred[t], vmin, vmax), 255)[:,:,0]
+
+ diff = Y_true - Y_pred
+ diff = diff[t_border: input_shape[0]- t_border, sp_border: input_shape[1]- sp_border, sp_border: input_shape[2]- sp_border]
+
+ psnrs = []
+ for t in range(diff.shape[0]):
+ rmse = np.sqrt(np.mean(np.power(diff[t],2)))
+ psnrs.append(20*np.log10(255./rmse))
+
+ return np.mean(np.asarray(psnrs))
+
+
+he_normal_init = tf.contrib.layers.variance_scaling_initializer(factor=2.0, mode='FAN_IN', uniform=False)
+
+def BatchNorm(input, is_train, decay=0.999, name='BatchNorm'):
+ '''
+ https://github.com/zsdonghao/tensorlayer/blob/master/tensorlayer/layers.py
+ https://github.com/ry/tensorflow-resnet/blob/master/resnet.py
+ http://stackoverflow.com/questions/38312668/how-does-one-do-inference-with-batch-normalization-with-tensor-flow
+ '''
+ from tensorflow.python.training import moving_averages
+ from tensorflow.python.ops import control_flow_ops
+
+ axis = list(range(len(input.get_shape()) - 1))
+ fdim = input.get_shape()[-1:]
+
+ with tf.variable_scope(name):
+ beta = tf.get_variable('beta', fdim, initializer=tf.constant_initializer(value=0.0))
+ gamma = tf.get_variable('gamma', fdim, initializer=tf.constant_initializer(value=1.0))
+ moving_mean = tf.get_variable('moving_mean', fdim, initializer=tf.constant_initializer(value=0.0), trainable=False)
+ moving_variance = tf.get_variable('moving_variance', fdim, initializer=tf.constant_initializer(value=0.0), trainable=False)
+
+ def mean_var_with_update():
+ batch_mean, batch_variance = tf.nn.moments(input, axis)
+ update_moving_mean = moving_averages.assign_moving_average(moving_mean, batch_mean, decay, zero_debias=True)
+ update_moving_variance = moving_averages.assign_moving_average(moving_variance, batch_variance, decay, zero_debias=True)
+ with tf.control_dependencies([update_moving_mean, update_moving_variance]):
+ return tf.identity(batch_mean), tf.identity(batch_variance)
+
+ mean, variance = control_flow_ops.cond(is_train, mean_var_with_update, lambda: (moving_mean, moving_variance))
+
+ return tf.nn.batch_normalization(input, mean, variance, beta, gamma, 1e-3) #, tf.stack([mean[0], variance[0], beta[0], gamma[0]])
+
+def Conv3D(input, kernel_shape, strides, padding, name='Conv3d', W_initializer=he_normal_init, bias=True):
+ with tf.variable_scope(name):
+ W = tf.get_variable("W", kernel_shape, initializer=W_initializer)
+ if bias is True:
+ b = tf.get_variable("b", (kernel_shape[-1]),initializer=tf.constant_initializer(value=0.0))
+ else:
+ b = 0
+
+ return tf.nn.conv3d(input, W, strides, padding) + b
+
+def LoadParams(sess, params, in_file='parmas.hdf5'):
+ f = h5py.File(in_file, 'r')
+ g = f['params']
+ assign_ops = []
+ # Flatten list
+ params = [item for sublist in params for item in sublist]
+
+ for param in params:
+ flag = False
+ for idx, name in enumerate(g):
+ #
+ parsed_name = list(name)
+ for i in range(0+1, len(parsed_name)-1):
+ if parsed_name[i] == '_' and (parsed_name[i-1] != '_' and parsed_name[i+1] != '_'):
+ parsed_name[i] = '/'
+ parsed_name = ''.join(parsed_name)
+ parsed_name = parsed_name.replace('__','_')
+
+ if param.name == parsed_name:
+ flag = True
+# print(param.name)
+ assign_ops += [param.assign(g[name][()])]
+
+ if not flag:
+ print('Warning::Cant find param: {}, ignore if intended.'.format(param.name))
+
+ sess.run(assign_ops)
+
+ print('Parameters are loaded')
+
+def depth_to_space_3D(x, block_size):
+ ds_x = tf.shape(x)
+ x = tf.reshape(x, [ds_x[0]*ds_x[1], ds_x[2], ds_x[3], ds_x[4]])
+
+ y = tf.depth_to_space(x, block_size)
+
+ ds_y = tf.shape(y)
+ x = tf.reshape(y, [ds_x[0], ds_x[1], ds_y[1], ds_y[2], ds_y[3]])
+ return x
+
+def DynFilter3D(x, F, filter_size):
+ '''
+ 3D Dynamic filtering
+ input x: (b, t, h, w)
+ F: (b, h, w, tower_depth, output_depth)
+ filter_shape (ft, fh, fw)
+ '''
+ # make tower
+ filter_localexpand_np = np.reshape(np.eye(np.prod(filter_size), np.prod(filter_size)), (filter_size[1], filter_size[2], filter_size[0], np.prod(filter_size)))
+ filter_localexpand = tf.Variable(filter_localexpand_np, trainable=False, dtype='float32',name='filter_localexpand')
+ x = tf.transpose(x, perm=[0,2,3,1])
+ x_localexpand = tf.nn.conv2d(x, filter_localexpand, [1,1,1,1], 'SAME') # b, h, w, 1*5*5
+ x_localexpand = tf.expand_dims(x_localexpand, axis=3) # b, h, w, 1, 1*5*5
+ x = tf.matmul(x_localexpand, F) # b, h, w, 1, R*R
+ x = tf.squeeze(x, axis=3) # b, h, w, R*R
+
+ return x
+
+def Huber(y_true, y_pred, delta, axis=None):
+ abs_error = tf.abs(y_pred - y_true)
+ quadratic = tf.minimum(abs_error, delta)
+ # The following expression is the same in value as
+ # tf.maximum(abs_error - delta, 0), but importantly the gradient for the
+ # expression when abs_error == delta is 0 (for tf.maximum it would be 1).
+ # This is necessary to avoid doubling the gradient, since there is already a
+ # nonzero contribution to the gradient from the quadratic term.
+ linear = (abs_error - quadratic)
+ losses = 0.5 * quadratic**2 + delta * linear
+ return tf.reduce_mean(losses, axis=axis)