summaryrefslogtreecommitdiff
path: root/become_yukarin/dataset
diff options
context:
space:
mode:
authorHiroshiba Kazuyuki <hihokaruta@gmail.com>2017-11-06 02:37:05 +0900
committerHiroshiba Kazuyuki <hihokaruta@gmail.com>2017-11-06 02:37:05 +0900
commitbe9104a1019104751ff9352a896df0f55946fd05 (patch)
tree6a028d061ee30dd6dbf1cb4333bb97c8731f13f3 /become_yukarin/dataset
add extract acoustic feature script
Diffstat (limited to 'become_yukarin/dataset')
-rw-r--r--become_yukarin/dataset/__init__.py2
-rw-r--r--become_yukarin/dataset/dataset.py150
-rw-r--r--become_yukarin/dataset/utility.py46
3 files changed, 198 insertions, 0 deletions
diff --git a/become_yukarin/dataset/__init__.py b/become_yukarin/dataset/__init__.py
new file mode 100644
index 0000000..cdd8cf4
--- /dev/null
+++ b/become_yukarin/dataset/__init__.py
@@ -0,0 +1,2 @@
+from . import dataset
+from . import utility
diff --git a/become_yukarin/dataset/dataset.py b/become_yukarin/dataset/dataset.py
new file mode 100644
index 0000000..781dbec
--- /dev/null
+++ b/become_yukarin/dataset/dataset.py
@@ -0,0 +1,150 @@
+import json
+import os
+import typing
+from abc import ABCMeta, abstractmethod
+from typing import NamedTuple
+
+import nnmnkwii.preprocessing
+import chainer
+import librosa
+import numpy
+import pysptk
+import pyworld
+
+
+class Wave(NamedTuple):
+ wave: numpy.ndarray
+ sampling_rate: int
+
+
+class AcousticFeature(NamedTuple):
+ f0: numpy.ndarray
+ spectrogram: numpy.ndarray
+ aperiodicity: numpy.ndarray
+ mfcc: numpy.ndarray
+
+
+class BaseDataProcess(metaclass=ABCMeta):
+ @abstractmethod
+ def __call__(self, data, test):
+ pass
+
+
+class ChainProcess(BaseDataProcess):
+ def __init__(self, process: typing.Iterable[BaseDataProcess]):
+ self._process = process
+
+ def __call__(self, data, test):
+ for p in self._process:
+ data = p(data, test)
+ return data
+
+
+class SplitProcess(BaseDataProcess):
+ def __init__(self, process: typing.Dict[str, typing.Optional[BaseDataProcess]]):
+ self._process = process
+
+ def __call__(self, data, test):
+ data = {
+ k: p(data, test) if p is not None else data
+ for k, p in self._process.items()
+ }
+ return data
+
+
+class DataProcessDataset(chainer.dataset.DatasetMixin):
+ def __init__(self, data: typing.List, data_process: BaseDataProcess, test):
+ self._data = data
+ self._data_process = data_process
+ self._test = test
+
+ def __len__(self):
+ return len(self._data)
+
+ def get_example(self, i):
+ return self._data_process(data=self._data[i], test=self._test)
+
+
+class WaveFileLoadProcess(BaseDataProcess):
+ def __init__(self, sample_rate: int, top_db: float):
+ self._sample_rate = sample_rate
+ self._top_db = top_db
+
+ def __call__(self, data: str, test):
+ wave = librosa.core.load(data, sr=self._sample_rate)[0]
+ wave = librosa.effects.remix(wave, intervals=librosa.effects.split(wave, top_db=self._top_db))
+ return Wave(wave, self._sample_rate)
+
+
+class AcousticFeatureProcess(BaseDataProcess):
+ def __init__(self, frame_period, order, alpha):
+ self._frame_period = frame_period
+ self._order = order
+ self._alpha = alpha
+
+ def __call__(self, data: Wave, test):
+ x = data.wave.astype(numpy.float64)
+ fs = data.sampling_rate
+
+ _f0, t = pyworld.dio(x, fs, frame_period=self._frame_period)
+ f0 = pyworld.stonemask(x, _f0, t, fs)
+ spectrogram = pyworld.cheaptrick(x, f0, t, fs)
+ aperiodicity = pyworld.d4c(x, f0, t, fs)
+ mfcc = pysptk.sp2mc(spectrogram, order=self._order, alpha=self._alpha)
+ return AcousticFeature(
+ f0=f0,
+ spectrogram=spectrogram,
+ aperiodicity=aperiodicity,
+ mfcc=mfcc,
+ )
+
+
+# data_process = ChainProcess([
+# SplitProcess(dict(
+# input=ChainProcess([
+# WaveFileLoadProcess(),
+# AcousticFeatureProcess(),
+# ]),
+# tareget=ChainProcess([
+# WaveFileLoadProcess(),
+# AcousticFeatureProcess(),
+# ]),
+# )),
+#
+# PILImageProcess(mode='RGB'),
+# RandomFlipImageProcess(p_flip_horizontal=0.5, p_flip_vertical=0),
+# RandomResizeImageProcess(min_short=128, max_short=160),
+# RandomCropImageProcess(crop_width=128, crop_height=128),
+# RgbImageArrayProcess(),
+# SplitProcess({
+# 'target': None,
+# 'raw_line': RawLineImageArrayProcess(),
+# })
+# ])
+#
+#
+# def choose(config: DatasetConfig):
+# if config.images_glob is not None:
+# import glob
+# paths = glob.glob(config.images_glob)
+# paths = data_filter(
+# datas=paths,
+# keys=list(map(lambda p: os.path.basename(p), paths)),
+# filter_func=filter_image,
+# num_process=None,
+# cache_path=config.cache_path,
+# )
+# paths = list(paths)
+# else:
+# paths = json.load(open(config.images_list))
+#
+# num_test = config.num_test
+# train_paths = paths[num_test:]
+# test_paths = paths[:num_test]
+# train_for_evaluate_paths = train_paths[:num_test]
+#
+# return {
+# 'train': DataProcessDataset(train_paths, data_process, test=False),
+# 'test': DataProcessDataset(test_paths, data_process, test=True),
+# 'train_eval': DataProcessDataset(train_for_evaluate_paths, data_process, test=True),
+# }
diff --git a/become_yukarin/dataset/utility.py b/become_yukarin/dataset/utility.py
new file mode 100644
index 0000000..b2f5480
--- /dev/null
+++ b/become_yukarin/dataset/utility.py
@@ -0,0 +1,46 @@
+import fastdtw
+import nnmnkwii.metrics
+import numpy
+import scipy.interpolate
+
+
+class DTWAligner(object):
+ """
+ from https://github.com/r9y9/nnmnkwii/blob/4cade86b5c35b4e35615a2a8162ddc638018af0e/nnmnkwii/preprocessing/alignment.py#L14
+ """
+
+ def __init__(self, x, y, dist=lambda x, y: numpy.linalg.norm(x - y), radius=1):
+ assert x.ndim == 2 and y.ndim == 2
+
+ _, path = fastdtw.fastdtw(x, y, radius=radius, dist=dist)
+ self.normed_path_x = numpy.array(list(map(lambda l: l[0], path))) / len(x)
+ self.normed_path_y = numpy.array(list(map(lambda l: l[1], path))) / len(y)
+
+ def align_x(self, x):
+ path = self._interp_path(self.normed_path_x, len(x))
+ return x[path]
+
+ def align_y(self, y):
+ path = self._interp_path(self.normed_path_y, len(y))
+ return y[path]
+
+ def align(self, x, y):
+ return self.align_x(x), self.align_y(y)
+
+ @staticmethod
+ def align_and_transform(x, y, *args, **kwargs):
+ aligner = DTWAligner(*args, x=x, y=y, **kwargs)
+ return aligner.align(x, y)
+
+ @staticmethod
+ def _interp_path(normed_path: numpy.ndarray, target_length: int):
+ base = numpy.linspace(0, 1, len(normed_path))
+ target = numpy.linspace(0, 1, target_length)
+ path = scipy.interpolate.interp1d(base, normed_path)(target)
+ path = numpy.floor(path * target_length).astype(numpy.int)
+ return path
+
+
+class MFCCAligner(DTWAligner):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, dist=nnmnkwii.metrics.melcd, **kwargs)