diff options
Diffstat (limited to 'utils.py')
| -rw-r--r-- | utils.py | 229 |
1 files changed, 229 insertions, 0 deletions
diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..26837cd --- /dev/null +++ b/utils.py @@ -0,0 +1,229 @@ +# -*- coding: utf-8 -*- +import numpy as np +import tensorflow as tf +import h5py +from PIL import Image + +def LoadImage(path, color_mode='RGB', channel_mean=None, modcrop=[0,0,0,0]): + '''Load an image using PIL and convert it into specified color space, + and return it as an numpy array. + + https://github.com/fchollet/keras/blob/master/keras/preprocessing/image.py + The code is modified from Keras.preprocessing.image.load_img, img_to_array. + ''' + ## Load image + img = Image.open(path) + if color_mode == 'RGB': + cimg = img.convert('RGB') + x = np.asarray(cimg, dtype='float32') + + elif color_mode == 'YCbCr' or color_mode == 'Y': + cimg = img.convert('YCbCr') + x = np.asarray(cimg, dtype='float32') + if color_mode == 'Y': + x = x[:,:,0:1] + + ## To 0-1 + x *= 1.0/255.0 + + if channel_mean: + x[:,:,0] -= channel_mean[0] + x[:,:,1] -= channel_mean[1] + x[:,:,2] -= channel_mean[2] + + if modcrop[0]*modcrop[1]*modcrop[2]*modcrop[3]: + x = x[modcrop[0]:-modcrop[1], modcrop[2]:-modcrop[3], :] + + return x + +def DownSample(x, h, scale=4): + ds_x = tf.shape(x) + x = tf.reshape(x, [ds_x[0]*ds_x[1], ds_x[2], ds_x[3], 3]) + + # Reflect padding + W = tf.constant(h) + + filter_height, filter_width = 13, 13 + pad_height = filter_height - 1 + pad_width = filter_width - 1 + + # When pad_height (pad_width) is odd, we pad more to bottom (right), + # following the same convention as conv2d(). + pad_top = pad_height // 2 + pad_bottom = pad_height - pad_top + pad_left = pad_width // 2 + pad_right = pad_width - pad_left + pad_array = [[0,0], [pad_top, pad_bottom], [pad_left, pad_right], [0,0]] + + depthwise_F = tf.tile(W, [1, 1, 3, 1]) + y = tf.nn.depthwise_conv2d(tf.pad(x, pad_array, mode='REFLECT'), depthwise_F, [1, scale, scale, 1], 'VALID') + + ds_y = tf.shape(y) + y = tf.reshape(y, [ds_x[0], ds_x[1], ds_y[1], ds_y[2], 3]) + return y + +def _rgb2ycbcr(img, maxVal=255): + O = np.array([[16], + [128], + [128]]) + T = np.array([[0.256788235294118, 0.504129411764706, 0.097905882352941], + [-0.148223529411765, -0.290992156862745, 0.439215686274510], + [0.439215686274510, -0.367788235294118, -0.071427450980392]]) + + if maxVal == 1: + O = O / 255.0 + + t = np.reshape(img, (img.shape[0]*img.shape[1], img.shape[2])) + t = np.dot(t, np.transpose(T)) + t[:, 0] += O[0] + t[:, 1] += O[1] + t[:, 2] += O[2] + ycbcr = np.reshape(t, [img.shape[0], img.shape[1], img.shape[2]]) + + return ycbcr + +def to_uint8(x, vmin, vmax): + x = x.astype('float32') + x = (x-vmin)/(vmax-vmin)*255 # 0~255 + return np.clip(np.round(x), 0, 255) + +def AVG_PSNR(vid_true, vid_pred, vmin=0, vmax=255, t_border=2, sp_border=8, is_T_Y=False, is_P_Y=False): + ''' + This include RGB2ycbcr and VPSNR computed in Y + ''' + input_shape = vid_pred.shape + if is_T_Y: + Y_true = to_uint8(vid_true, vmin, vmax) + else: + Y_true = np.empty(input_shape[:-1]) + for t in range(input_shape[0]): + Y_true[t] = _rgb2ycbcr(to_uint8(vid_true[t], vmin, vmax), 255)[:,:,0] + + if is_P_Y: + Y_pred = to_uint8(vid_pred, vmin, vmax) + else: + Y_pred = np.empty(input_shape[:-1]) + for t in range(input_shape[0]): + Y_pred[t] = _rgb2ycbcr(to_uint8(vid_pred[t], vmin, vmax), 255)[:,:,0] + + diff = Y_true - Y_pred + diff = diff[t_border: input_shape[0]- t_border, sp_border: input_shape[1]- sp_border, sp_border: input_shape[2]- sp_border] + + psnrs = [] + for t in range(diff.shape[0]): + rmse = np.sqrt(np.mean(np.power(diff[t],2))) + psnrs.append(20*np.log10(255./rmse)) + + return np.mean(np.asarray(psnrs)) + + +he_normal_init = tf.contrib.layers.variance_scaling_initializer(factor=2.0, mode='FAN_IN', uniform=False) + +def BatchNorm(input, is_train, decay=0.999, name='BatchNorm'): + ''' + https://github.com/zsdonghao/tensorlayer/blob/master/tensorlayer/layers.py + https://github.com/ry/tensorflow-resnet/blob/master/resnet.py + http://stackoverflow.com/questions/38312668/how-does-one-do-inference-with-batch-normalization-with-tensor-flow + ''' + from tensorflow.python.training import moving_averages + from tensorflow.python.ops import control_flow_ops + + axis = list(range(len(input.get_shape()) - 1)) + fdim = input.get_shape()[-1:] + + with tf.variable_scope(name): + beta = tf.get_variable('beta', fdim, initializer=tf.constant_initializer(value=0.0)) + gamma = tf.get_variable('gamma', fdim, initializer=tf.constant_initializer(value=1.0)) + moving_mean = tf.get_variable('moving_mean', fdim, initializer=tf.constant_initializer(value=0.0), trainable=False) + moving_variance = tf.get_variable('moving_variance', fdim, initializer=tf.constant_initializer(value=0.0), trainable=False) + + def mean_var_with_update(): + batch_mean, batch_variance = tf.nn.moments(input, axis) + update_moving_mean = moving_averages.assign_moving_average(moving_mean, batch_mean, decay, zero_debias=True) + update_moving_variance = moving_averages.assign_moving_average(moving_variance, batch_variance, decay, zero_debias=True) + with tf.control_dependencies([update_moving_mean, update_moving_variance]): + return tf.identity(batch_mean), tf.identity(batch_variance) + + mean, variance = control_flow_ops.cond(is_train, mean_var_with_update, lambda: (moving_mean, moving_variance)) + + return tf.nn.batch_normalization(input, mean, variance, beta, gamma, 1e-3) #, tf.stack([mean[0], variance[0], beta[0], gamma[0]]) + +def Conv3D(input, kernel_shape, strides, padding, name='Conv3d', W_initializer=he_normal_init, bias=True): + with tf.variable_scope(name): + W = tf.get_variable("W", kernel_shape, initializer=W_initializer) + if bias is True: + b = tf.get_variable("b", (kernel_shape[-1]),initializer=tf.constant_initializer(value=0.0)) + else: + b = 0 + + return tf.nn.conv3d(input, W, strides, padding) + b + +def LoadParams(sess, params, in_file='parmas.hdf5'): + f = h5py.File(in_file, 'r') + g = f['params'] + assign_ops = [] + # Flatten list + params = [item for sublist in params for item in sublist] + + for param in params: + flag = False + for idx, name in enumerate(g): + # + parsed_name = list(name) + for i in range(0+1, len(parsed_name)-1): + if parsed_name[i] == '_' and (parsed_name[i-1] != '_' and parsed_name[i+1] != '_'): + parsed_name[i] = '/' + parsed_name = ''.join(parsed_name) + parsed_name = parsed_name.replace('__','_') + + if param.name == parsed_name: + flag = True +# print(param.name) + assign_ops += [param.assign(g[name][()])] + + if not flag: + print('Warning::Cant find param: {}, ignore if intended.'.format(param.name)) + + sess.run(assign_ops) + + print('Parameters are loaded') + +def depth_to_space_3D(x, block_size): + ds_x = tf.shape(x) + x = tf.reshape(x, [ds_x[0]*ds_x[1], ds_x[2], ds_x[3], ds_x[4]]) + + y = tf.depth_to_space(x, block_size) + + ds_y = tf.shape(y) + x = tf.reshape(y, [ds_x[0], ds_x[1], ds_y[1], ds_y[2], ds_y[3]]) + return x + +def DynFilter3D(x, F, filter_size): + ''' + 3D Dynamic filtering + input x: (b, t, h, w) + F: (b, h, w, tower_depth, output_depth) + filter_shape (ft, fh, fw) + ''' + # make tower + filter_localexpand_np = np.reshape(np.eye(np.prod(filter_size), np.prod(filter_size)), (filter_size[1], filter_size[2], filter_size[0], np.prod(filter_size))) + filter_localexpand = tf.Variable(filter_localexpand_np, trainable=False, dtype='float32',name='filter_localexpand') + x = tf.transpose(x, perm=[0,2,3,1]) + x_localexpand = tf.nn.conv2d(x, filter_localexpand, [1,1,1,1], 'SAME') # b, h, w, 1*5*5 + x_localexpand = tf.expand_dims(x_localexpand, axis=3) # b, h, w, 1, 1*5*5 + x = tf.matmul(x_localexpand, F) # b, h, w, 1, R*R + x = tf.squeeze(x, axis=3) # b, h, w, R*R + + return x + +def Huber(y_true, y_pred, delta, axis=None): + abs_error = tf.abs(y_pred - y_true) + quadratic = tf.minimum(abs_error, delta) + # The following expression is the same in value as + # tf.maximum(abs_error - delta, 0), but importantly the gradient for the + # expression when abs_error == delta is 0 (for tf.maximum it would be 1). + # This is necessary to avoid doubling the gradient, since there is already a + # nonzero contribution to the gradient from the quadratic term. + linear = (abs_error - quadratic) + losses = 0.5 * quadratic**2 + delta * linear + return tf.reduce_mean(losses, axis=axis) |
