diff options
Diffstat (limited to 'become_yukarin/voice_changer.py')
| -rw-r--r-- | become_yukarin/voice_changer.py | 243 |
1 files changed, 78 insertions, 165 deletions
diff --git a/become_yukarin/voice_changer.py b/become_yukarin/voice_changer.py index 3b75fb1..bed155f 100644 --- a/become_yukarin/voice_changer.py +++ b/become_yukarin/voice_changer.py @@ -1,4 +1,5 @@ -from typing import List +from abc import ABCMeta, abstractproperty, abstractmethod +from typing import List, Callable, Any from typing import NamedTuple import numpy @@ -24,16 +25,6 @@ class VoiceChanger(object): self.super_resolution = super_resolution self.output_sampling_rate = output_sampling_rate - # def convert_from_wave_path(self, wave_path: str): - # w_in = self.acoustic_converter._wave_process(wave_path) - # return self.convert_from_wave(w_in) - # - # def convert_from_wave(self, wave: Wave): - # f_in = self.acoustic_converter._feature_process(wave) - # f_high = self.convert_from_acoustic_feature(f_in) - # wave = self.vocoder.decode(f_high) - # return wave - def convert_from_acoustic_feature(self, f_in: AcousticFeature): f_low = self.acoustic_converter.convert_to_feature(f_in) s_high = self.super_resolution.convert(f_low.spectrogram.astype(numpy.float32)) @@ -41,7 +32,21 @@ class VoiceChanger(object): return f_high -class FeatureSegment(NamedTuple): +class BaseSegment(ABCMeta): + start_time: float + + @property + @abstractmethod + def time_length(self) -> float: + pass + + @property + @abstractmethod + def end_time(self) -> float: + pass + + +class FeatureSegment(NamedTuple, BaseSegment): start_time: float feature: AcousticFeature frame_period: float @@ -55,7 +60,7 @@ class FeatureSegment(NamedTuple): return self.time_length + self.start_time -class Segment(NamedTuple): +class WaveSegment(NamedTuple, BaseSegment): start_time: float wave: Wave @@ -81,7 +86,7 @@ class VoiceChangerStream(object): self.voice_changer: VoiceChanger = None self.vocoder: Vocoder = None - self._data_stream = [] # type: List[Segment] + self._data_stream = [] # type: List[WaveSegment] self._in_feature_stream = [] # type: List[FeatureSegment] self._out_feature_stream = [] # type: List[FeatureSegment] @@ -90,7 +95,7 @@ class VoiceChangerStream(object): assert wave.sampling_rate == self.sampling_rate assert wave.wave.dtype == self.in_dtype - segment = Segment(start_time=start_time, wave=wave) + segment = WaveSegment(start_time=start_time, wave=wave) self._data_stream.append(segment) def add_in_feature(self, start_time: float, feature: AcousticFeature, frame_period: float): @@ -113,23 +118,31 @@ class VoiceChangerStream(object): self._in_feature_stream = list(filter(lambda s: s.end_time > end_time, self._in_feature_stream)) self._out_feature_stream = list(filter(lambda s: s.end_time > end_time, self._out_feature_stream)) - def pre_convert(self, start_time: float, time_length: float, extra_time: float): + @staticmethod + def fetch( + start_time: float, + time_length: float, + data_stream: List[BaseSegment], + rate: float, + pad_function: Callable[[int], Any], + pick_function: Callable[[Any, int, int], Any], + concat_function: Callable[[List], Any], + extra_time: float = 0, + ): start_time -= extra_time time_length += extra_time * 2 end_time = start_time + time_length buffer_list = [] - stream = filter(lambda s: not (end_time < s.start_time or s.end_time < start_time), self._data_stream) + stream = filter(lambda s: not (end_time < s.start_time or s.end_time < start_time), data_stream) start_time_buffer = start_time remaining_time = time_length for segment in stream: # padding if segment.start_time > start_time_buffer: - pad = numpy.zeros( - shape=int((segment.start_time - start_time_buffer) * self.sampling_rate), - dtype=self.in_dtype, - ) + length = int((segment.start_time - start_time_buffer) * rate) + pad = pad_function(length) buffer_list.append(pad) start_time_buffer = segment.start_time @@ -138,9 +151,9 @@ class VoiceChangerStream(object): else: one_time_length = remaining_time - first_index = int((start_time_buffer - segment.start_time) * self.sampling_rate) - last_index = int(first_index + one_time_length * self.sampling_rate) - one_buffer = segment.wave.wave[first_index:last_index] + first_index = int((start_time_buffer - segment.start_time) * rate) + last_index = int(first_index + one_time_length * rate) + one_buffer = pick_function(segment, first_index, last_index) buffer_list.append(one_buffer) start_time_buffer += one_time_length @@ -150,163 +163,63 @@ class VoiceChangerStream(object): break else: # last padding - pad = numpy.zeros(shape=int((end_time - start_time_buffer) * self.sampling_rate), dtype=self.in_dtype) + length = int((end_time - start_time_buffer) * rate) + pad = pad_function(length) buffer_list.append(pad) - buffer = numpy.concatenate(buffer_list) - in_wave = Wave(wave=buffer, sampling_rate=self.sampling_rate) + buffer = concat_function(buffer_list) + return buffer + + def pre_convert(self, start_time: float, time_length: float, extra_time: float): + wave = self.fetch( + start_time=start_time, + time_length=time_length, + extra_time=extra_time, + data_stream=self._data_stream, + rate=self.sampling_rate, + pad_function=lambda length: numpy.zeros(shape=length, dtype=self.in_dtype), + pick_function=lambda segment, first, last: segment.wave.wave[first:last], + concat_function=numpy.concatenate, + ) + in_wave = Wave(wave=wave, sampling_rate=self.sampling_rate) in_feature = self.vocoder.encode(in_wave) pad = int(extra_time / (self.vocoder.acoustic_feature_param.frame_period / 1000)) - in_feature = AcousticFeature( - f0=in_feature.f0[pad:-pad], - spectrogram=in_feature.spectrogram[pad:-pad], - aperiodicity=in_feature.aperiodicity[pad:-pad], - mfcc=in_feature.mfcc[pad:-pad], - voiced=in_feature.voiced[pad:-pad], - ) + in_feature = in_feature.pick(pad, -pad) return in_feature def convert(self, start_time: float, time_length: float, extra_time: float): - start_time -= extra_time - time_length += extra_time * 2 - order = self.voice_changer.acoustic_converter.config.dataset.param.acoustic_feature_param.order - - end_time = start_time + time_length - f0_buffer_list = [] - mfcc_buffer_list = [] - ap_buffer_list = [] - voiced_buffer_list = [] - stream = filter(lambda s: not (end_time < s.start_time or s.end_time < start_time), self._in_feature_stream) - - start_time_buffer = start_time - remaining_time = time_length - for segment in stream: - # padding - if segment.start_time > start_time_buffer: - pad_size = int((segment.start_time - start_time_buffer) * 1000 / self.frame_period) - dims = AcousticFeature.get_sizes(self.sampling_rate, order) - - f0_buffer_list.append(numpy.zeros(shape=[pad_size, 1], dtype=self.in_dtype)) - mfcc_buffer_list.append(numpy.zeros(shape=[pad_size, dims['mfcc']], dtype=self.in_dtype)) - ap_buffer_list.append(numpy.zeros(shape=[pad_size, dims['aperiodicity']], dtype=self.in_dtype)) - voiced_buffer_list.append(numpy.zeros(shape=[pad_size, 1], dtype=numpy.bool)) - - start_time_buffer = segment.start_time - if remaining_time > segment.end_time - start_time_buffer: - one_time_length = segment.end_time - start_time_buffer - else: - one_time_length = remaining_time - - first_index = int((start_time_buffer - segment.start_time) * 1000 / self.frame_period) - last_index = int(first_index + one_time_length * 1000 / self.frame_period) - - f0_buffer_list.append(segment.feature.f0[first_index:last_index]) - mfcc_buffer_list.append(segment.feature.mfcc[first_index:last_index]) - ap_buffer_list.append(segment.feature.aperiodicity[first_index:last_index]) - voiced_buffer_list.append(segment.feature.voiced[first_index:last_index]) - - start_time_buffer += one_time_length - remaining_time -= one_time_length - - if start_time_buffer >= end_time: - break - else: - # last padding - pad_size = int((end_time - start_time_buffer) * 1000 / self.frame_period) - dims = AcousticFeature.get_sizes(self.sampling_rate, order) - - f0_buffer_list.append(numpy.zeros(shape=[pad_size, 1], dtype=self.in_dtype)) - mfcc_buffer_list.append(numpy.zeros(shape=[pad_size, dims['mfcc']], dtype=self.in_dtype)) - ap_buffer_list.append(numpy.zeros(shape=[pad_size, dims['aperiodicity']], dtype=self.in_dtype)) - voiced_buffer_list.append(numpy.zeros(shape=[pad_size, 1], dtype=numpy.bool)) - - f0 = numpy.concatenate(f0_buffer_list) - mfcc = numpy.concatenate(mfcc_buffer_list) - aperiodicity = numpy.concatenate(ap_buffer_list) - voiced = numpy.concatenate(voiced_buffer_list) - in_feature = AcousticFeature( - f0=f0, - spectrogram=numpy.nan, - aperiodicity=aperiodicity, - mfcc=mfcc, - voiced=voiced, + sizes = AcousticFeature.get_sizes(sampling_rate=self.sampling_rate, order=order) + keys = ['f0', 'aperiodicity', 'mfcc', 'voiced'] + in_feature = self.fetch( + start_time=start_time, + time_length=time_length, + extra_time=extra_time, + data_stream=self._in_feature_stream, + rate=1000 / self.frame_period, + pad_function=lambda length: AcousticFeature.silent(length, sizes=sizes, keys=keys), + pick_function=lambda segment, first, last: segment.feature.pick(first, last), + concat_function=lambda buffers: AcousticFeature.concatenate(buffers, keys=keys), ) - out_feature = self.voice_changer.convert_from_acoustic_feature(in_feature) pad = int(extra_time * 1000 / self.frame_period) - out_feature= AcousticFeature( - f0=out_feature.f0[pad:-pad], - spectrogram=out_feature.spectrogram[pad:-pad], - aperiodicity=out_feature.aperiodicity[pad:-pad], - mfcc=out_feature.mfcc[pad:-pad], - voiced=out_feature.voiced[pad:-pad], - ) + out_feature = out_feature.pick(pad, -pad) return out_feature def post_convert(self, start_time: float, time_length: float): - end_time = start_time + time_length - f0_buffer_list = [] - sp_buffer_list = [] - ap_buffer_list = [] - voiced_buffer_list = [] - stream = filter(lambda s: not (end_time < s.start_time or s.end_time < start_time), self._out_feature_stream) - - start_time_buffer = start_time - remaining_time = time_length - for segment in stream: - # padding - if segment.start_time > start_time_buffer: - pad_size = int((segment.start_time - start_time_buffer) * 1000 / self.frame_period) - dims = AcousticFeature.get_sizes(self.sampling_rate, self.vocoder.acoustic_feature_param.order) - - f0_buffer_list.append(numpy.zeros(shape=[pad_size, 1], dtype=self.in_dtype)) - sp_buffer_list.append(numpy.zeros(shape=[pad_size, dims['spectrogram']], dtype=self.in_dtype)) - ap_buffer_list.append(numpy.zeros(shape=[pad_size, dims['aperiodicity']], dtype=self.in_dtype)) - voiced_buffer_list.append(numpy.zeros(shape=[pad_size, 1], dtype=numpy.bool)) - - start_time_buffer = segment.start_time - - if remaining_time > segment.end_time - start_time_buffer: - one_time_length = segment.end_time - start_time_buffer - else: - one_time_length = remaining_time - - first_index = int((start_time_buffer - segment.start_time) * 1000 / self.frame_period) - last_index = int(first_index + one_time_length * 1000 / self.frame_period) - - f0_buffer_list.append(segment.feature.f0[first_index:last_index]) - sp_buffer_list.append(segment.feature.spectrogram[first_index:last_index]) - ap_buffer_list.append(segment.feature.aperiodicity[first_index:last_index]) - voiced_buffer_list.append(segment.feature.voiced[first_index:last_index]) - - start_time_buffer += one_time_length - remaining_time -= one_time_length - - if start_time_buffer >= end_time: - break - else: - # last padding - pad_size = int((end_time - start_time_buffer) * 1000 / self.frame_period) - dims = AcousticFeature.get_sizes(self.sampling_rate, self.vocoder.acoustic_feature_param.order) - - f0_buffer_list.append(numpy.zeros(shape=[pad_size, 1], dtype=self.in_dtype)) - sp_buffer_list.append(numpy.zeros(shape=[pad_size, dims['spectrogram']], dtype=self.in_dtype)) - ap_buffer_list.append(numpy.zeros(shape=[pad_size, dims['aperiodicity']], dtype=self.in_dtype)) - voiced_buffer_list.append(numpy.zeros(shape=[pad_size, 1], dtype=self.in_dtype)) - - f0 = numpy.concatenate(f0_buffer_list) - spectrogram = numpy.concatenate(sp_buffer_list) - aperiodicity = numpy.concatenate(ap_buffer_list) - voiced = numpy.concatenate(voiced_buffer_list) - out_feature = AcousticFeature( - f0=f0, - spectrogram=spectrogram, - aperiodicity=aperiodicity, - mfcc=numpy.nan, - voiced=voiced, + order = self.voice_changer.acoustic_converter.config.dataset.param.acoustic_feature_param.order + sizes = AcousticFeature.get_sizes(sampling_rate=self.sampling_rate, order=order) + keys = ['f0', 'aperiodicity', 'spectrogram', 'voiced'] + out_feature = self.fetch( + start_time=start_time, + time_length=time_length, + data_stream=self._out_feature_stream, + rate=1000 / self.frame_period, + pad_function=lambda length: AcousticFeature.silent(length, sizes=sizes, keys=keys), + pick_function=lambda segment, first, last: segment.feature.pick(first, last), + concat_function=lambda buffers: AcousticFeature.concatenate(buffers, keys=keys), ) out_wave = self.vocoder.decode( |
