summaryrefslogtreecommitdiff
path: root/become_yukarin/voice_changer.py
diff options
context:
space:
mode:
Diffstat (limited to 'become_yukarin/voice_changer.py')
-rw-r--r--become_yukarin/voice_changer.py243
1 files changed, 78 insertions, 165 deletions
diff --git a/become_yukarin/voice_changer.py b/become_yukarin/voice_changer.py
index 3b75fb1..bed155f 100644
--- a/become_yukarin/voice_changer.py
+++ b/become_yukarin/voice_changer.py
@@ -1,4 +1,5 @@
-from typing import List
+from abc import ABCMeta, abstractproperty, abstractmethod
+from typing import List, Callable, Any
from typing import NamedTuple
import numpy
@@ -24,16 +25,6 @@ class VoiceChanger(object):
self.super_resolution = super_resolution
self.output_sampling_rate = output_sampling_rate
- # def convert_from_wave_path(self, wave_path: str):
- # w_in = self.acoustic_converter._wave_process(wave_path)
- # return self.convert_from_wave(w_in)
- #
- # def convert_from_wave(self, wave: Wave):
- # f_in = self.acoustic_converter._feature_process(wave)
- # f_high = self.convert_from_acoustic_feature(f_in)
- # wave = self.vocoder.decode(f_high)
- # return wave
-
def convert_from_acoustic_feature(self, f_in: AcousticFeature):
f_low = self.acoustic_converter.convert_to_feature(f_in)
s_high = self.super_resolution.convert(f_low.spectrogram.astype(numpy.float32))
@@ -41,7 +32,21 @@ class VoiceChanger(object):
return f_high
-class FeatureSegment(NamedTuple):
+class BaseSegment(ABCMeta):
+ start_time: float
+
+ @property
+ @abstractmethod
+ def time_length(self) -> float:
+ pass
+
+ @property
+ @abstractmethod
+ def end_time(self) -> float:
+ pass
+
+
+class FeatureSegment(NamedTuple, BaseSegment):
start_time: float
feature: AcousticFeature
frame_period: float
@@ -55,7 +60,7 @@ class FeatureSegment(NamedTuple):
return self.time_length + self.start_time
-class Segment(NamedTuple):
+class WaveSegment(NamedTuple, BaseSegment):
start_time: float
wave: Wave
@@ -81,7 +86,7 @@ class VoiceChangerStream(object):
self.voice_changer: VoiceChanger = None
self.vocoder: Vocoder = None
- self._data_stream = [] # type: List[Segment]
+ self._data_stream = [] # type: List[WaveSegment]
self._in_feature_stream = [] # type: List[FeatureSegment]
self._out_feature_stream = [] # type: List[FeatureSegment]
@@ -90,7 +95,7 @@ class VoiceChangerStream(object):
assert wave.sampling_rate == self.sampling_rate
assert wave.wave.dtype == self.in_dtype
- segment = Segment(start_time=start_time, wave=wave)
+ segment = WaveSegment(start_time=start_time, wave=wave)
self._data_stream.append(segment)
def add_in_feature(self, start_time: float, feature: AcousticFeature, frame_period: float):
@@ -113,23 +118,31 @@ class VoiceChangerStream(object):
self._in_feature_stream = list(filter(lambda s: s.end_time > end_time, self._in_feature_stream))
self._out_feature_stream = list(filter(lambda s: s.end_time > end_time, self._out_feature_stream))
- def pre_convert(self, start_time: float, time_length: float, extra_time: float):
+ @staticmethod
+ def fetch(
+ start_time: float,
+ time_length: float,
+ data_stream: List[BaseSegment],
+ rate: float,
+ pad_function: Callable[[int], Any],
+ pick_function: Callable[[Any, int, int], Any],
+ concat_function: Callable[[List], Any],
+ extra_time: float = 0,
+ ):
start_time -= extra_time
time_length += extra_time * 2
end_time = start_time + time_length
buffer_list = []
- stream = filter(lambda s: not (end_time < s.start_time or s.end_time < start_time), self._data_stream)
+ stream = filter(lambda s: not (end_time < s.start_time or s.end_time < start_time), data_stream)
start_time_buffer = start_time
remaining_time = time_length
for segment in stream:
# padding
if segment.start_time > start_time_buffer:
- pad = numpy.zeros(
- shape=int((segment.start_time - start_time_buffer) * self.sampling_rate),
- dtype=self.in_dtype,
- )
+ length = int((segment.start_time - start_time_buffer) * rate)
+ pad = pad_function(length)
buffer_list.append(pad)
start_time_buffer = segment.start_time
@@ -138,9 +151,9 @@ class VoiceChangerStream(object):
else:
one_time_length = remaining_time
- first_index = int((start_time_buffer - segment.start_time) * self.sampling_rate)
- last_index = int(first_index + one_time_length * self.sampling_rate)
- one_buffer = segment.wave.wave[first_index:last_index]
+ first_index = int((start_time_buffer - segment.start_time) * rate)
+ last_index = int(first_index + one_time_length * rate)
+ one_buffer = pick_function(segment, first_index, last_index)
buffer_list.append(one_buffer)
start_time_buffer += one_time_length
@@ -150,163 +163,63 @@ class VoiceChangerStream(object):
break
else:
# last padding
- pad = numpy.zeros(shape=int((end_time - start_time_buffer) * self.sampling_rate), dtype=self.in_dtype)
+ length = int((end_time - start_time_buffer) * rate)
+ pad = pad_function(length)
buffer_list.append(pad)
- buffer = numpy.concatenate(buffer_list)
- in_wave = Wave(wave=buffer, sampling_rate=self.sampling_rate)
+ buffer = concat_function(buffer_list)
+ return buffer
+
+ def pre_convert(self, start_time: float, time_length: float, extra_time: float):
+ wave = self.fetch(
+ start_time=start_time,
+ time_length=time_length,
+ extra_time=extra_time,
+ data_stream=self._data_stream,
+ rate=self.sampling_rate,
+ pad_function=lambda length: numpy.zeros(shape=length, dtype=self.in_dtype),
+ pick_function=lambda segment, first, last: segment.wave.wave[first:last],
+ concat_function=numpy.concatenate,
+ )
+ in_wave = Wave(wave=wave, sampling_rate=self.sampling_rate)
in_feature = self.vocoder.encode(in_wave)
pad = int(extra_time / (self.vocoder.acoustic_feature_param.frame_period / 1000))
- in_feature = AcousticFeature(
- f0=in_feature.f0[pad:-pad],
- spectrogram=in_feature.spectrogram[pad:-pad],
- aperiodicity=in_feature.aperiodicity[pad:-pad],
- mfcc=in_feature.mfcc[pad:-pad],
- voiced=in_feature.voiced[pad:-pad],
- )
+ in_feature = in_feature.pick(pad, -pad)
return in_feature
def convert(self, start_time: float, time_length: float, extra_time: float):
- start_time -= extra_time
- time_length += extra_time * 2
-
order = self.voice_changer.acoustic_converter.config.dataset.param.acoustic_feature_param.order
-
- end_time = start_time + time_length
- f0_buffer_list = []
- mfcc_buffer_list = []
- ap_buffer_list = []
- voiced_buffer_list = []
- stream = filter(lambda s: not (end_time < s.start_time or s.end_time < start_time), self._in_feature_stream)
-
- start_time_buffer = start_time
- remaining_time = time_length
- for segment in stream:
- # padding
- if segment.start_time > start_time_buffer:
- pad_size = int((segment.start_time - start_time_buffer) * 1000 / self.frame_period)
- dims = AcousticFeature.get_sizes(self.sampling_rate, order)
-
- f0_buffer_list.append(numpy.zeros(shape=[pad_size, 1], dtype=self.in_dtype))
- mfcc_buffer_list.append(numpy.zeros(shape=[pad_size, dims['mfcc']], dtype=self.in_dtype))
- ap_buffer_list.append(numpy.zeros(shape=[pad_size, dims['aperiodicity']], dtype=self.in_dtype))
- voiced_buffer_list.append(numpy.zeros(shape=[pad_size, 1], dtype=numpy.bool))
-
- start_time_buffer = segment.start_time
- if remaining_time > segment.end_time - start_time_buffer:
- one_time_length = segment.end_time - start_time_buffer
- else:
- one_time_length = remaining_time
-
- first_index = int((start_time_buffer - segment.start_time) * 1000 / self.frame_period)
- last_index = int(first_index + one_time_length * 1000 / self.frame_period)
-
- f0_buffer_list.append(segment.feature.f0[first_index:last_index])
- mfcc_buffer_list.append(segment.feature.mfcc[first_index:last_index])
- ap_buffer_list.append(segment.feature.aperiodicity[first_index:last_index])
- voiced_buffer_list.append(segment.feature.voiced[first_index:last_index])
-
- start_time_buffer += one_time_length
- remaining_time -= one_time_length
-
- if start_time_buffer >= end_time:
- break
- else:
- # last padding
- pad_size = int((end_time - start_time_buffer) * 1000 / self.frame_period)
- dims = AcousticFeature.get_sizes(self.sampling_rate, order)
-
- f0_buffer_list.append(numpy.zeros(shape=[pad_size, 1], dtype=self.in_dtype))
- mfcc_buffer_list.append(numpy.zeros(shape=[pad_size, dims['mfcc']], dtype=self.in_dtype))
- ap_buffer_list.append(numpy.zeros(shape=[pad_size, dims['aperiodicity']], dtype=self.in_dtype))
- voiced_buffer_list.append(numpy.zeros(shape=[pad_size, 1], dtype=numpy.bool))
-
- f0 = numpy.concatenate(f0_buffer_list)
- mfcc = numpy.concatenate(mfcc_buffer_list)
- aperiodicity = numpy.concatenate(ap_buffer_list)
- voiced = numpy.concatenate(voiced_buffer_list)
- in_feature = AcousticFeature(
- f0=f0,
- spectrogram=numpy.nan,
- aperiodicity=aperiodicity,
- mfcc=mfcc,
- voiced=voiced,
+ sizes = AcousticFeature.get_sizes(sampling_rate=self.sampling_rate, order=order)
+ keys = ['f0', 'aperiodicity', 'mfcc', 'voiced']
+ in_feature = self.fetch(
+ start_time=start_time,
+ time_length=time_length,
+ extra_time=extra_time,
+ data_stream=self._in_feature_stream,
+ rate=1000 / self.frame_period,
+ pad_function=lambda length: AcousticFeature.silent(length, sizes=sizes, keys=keys),
+ pick_function=lambda segment, first, last: segment.feature.pick(first, last),
+ concat_function=lambda buffers: AcousticFeature.concatenate(buffers, keys=keys),
)
-
out_feature = self.voice_changer.convert_from_acoustic_feature(in_feature)
pad = int(extra_time * 1000 / self.frame_period)
- out_feature= AcousticFeature(
- f0=out_feature.f0[pad:-pad],
- spectrogram=out_feature.spectrogram[pad:-pad],
- aperiodicity=out_feature.aperiodicity[pad:-pad],
- mfcc=out_feature.mfcc[pad:-pad],
- voiced=out_feature.voiced[pad:-pad],
- )
+ out_feature = out_feature.pick(pad, -pad)
return out_feature
def post_convert(self, start_time: float, time_length: float):
- end_time = start_time + time_length
- f0_buffer_list = []
- sp_buffer_list = []
- ap_buffer_list = []
- voiced_buffer_list = []
- stream = filter(lambda s: not (end_time < s.start_time or s.end_time < start_time), self._out_feature_stream)
-
- start_time_buffer = start_time
- remaining_time = time_length
- for segment in stream:
- # padding
- if segment.start_time > start_time_buffer:
- pad_size = int((segment.start_time - start_time_buffer) * 1000 / self.frame_period)
- dims = AcousticFeature.get_sizes(self.sampling_rate, self.vocoder.acoustic_feature_param.order)
-
- f0_buffer_list.append(numpy.zeros(shape=[pad_size, 1], dtype=self.in_dtype))
- sp_buffer_list.append(numpy.zeros(shape=[pad_size, dims['spectrogram']], dtype=self.in_dtype))
- ap_buffer_list.append(numpy.zeros(shape=[pad_size, dims['aperiodicity']], dtype=self.in_dtype))
- voiced_buffer_list.append(numpy.zeros(shape=[pad_size, 1], dtype=numpy.bool))
-
- start_time_buffer = segment.start_time
-
- if remaining_time > segment.end_time - start_time_buffer:
- one_time_length = segment.end_time - start_time_buffer
- else:
- one_time_length = remaining_time
-
- first_index = int((start_time_buffer - segment.start_time) * 1000 / self.frame_period)
- last_index = int(first_index + one_time_length * 1000 / self.frame_period)
-
- f0_buffer_list.append(segment.feature.f0[first_index:last_index])
- sp_buffer_list.append(segment.feature.spectrogram[first_index:last_index])
- ap_buffer_list.append(segment.feature.aperiodicity[first_index:last_index])
- voiced_buffer_list.append(segment.feature.voiced[first_index:last_index])
-
- start_time_buffer += one_time_length
- remaining_time -= one_time_length
-
- if start_time_buffer >= end_time:
- break
- else:
- # last padding
- pad_size = int((end_time - start_time_buffer) * 1000 / self.frame_period)
- dims = AcousticFeature.get_sizes(self.sampling_rate, self.vocoder.acoustic_feature_param.order)
-
- f0_buffer_list.append(numpy.zeros(shape=[pad_size, 1], dtype=self.in_dtype))
- sp_buffer_list.append(numpy.zeros(shape=[pad_size, dims['spectrogram']], dtype=self.in_dtype))
- ap_buffer_list.append(numpy.zeros(shape=[pad_size, dims['aperiodicity']], dtype=self.in_dtype))
- voiced_buffer_list.append(numpy.zeros(shape=[pad_size, 1], dtype=self.in_dtype))
-
- f0 = numpy.concatenate(f0_buffer_list)
- spectrogram = numpy.concatenate(sp_buffer_list)
- aperiodicity = numpy.concatenate(ap_buffer_list)
- voiced = numpy.concatenate(voiced_buffer_list)
- out_feature = AcousticFeature(
- f0=f0,
- spectrogram=spectrogram,
- aperiodicity=aperiodicity,
- mfcc=numpy.nan,
- voiced=voiced,
+ order = self.voice_changer.acoustic_converter.config.dataset.param.acoustic_feature_param.order
+ sizes = AcousticFeature.get_sizes(sampling_rate=self.sampling_rate, order=order)
+ keys = ['f0', 'aperiodicity', 'spectrogram', 'voiced']
+ out_feature = self.fetch(
+ start_time=start_time,
+ time_length=time_length,
+ data_stream=self._out_feature_stream,
+ rate=1000 / self.frame_period,
+ pad_function=lambda length: AcousticFeature.silent(length, sizes=sizes, keys=keys),
+ pick_function=lambda segment, first, last: segment.feature.pick(first, last),
+ concat_function=lambda buffers: AcousticFeature.concatenate(buffers, keys=keys),
)
out_wave = self.vocoder.decode(