diff options
Diffstat (limited to 'pysoundtouch/tools/Shifter.py')
| -rw-r--r-- | pysoundtouch/tools/Shifter.py | 238 |
1 files changed, 238 insertions, 0 deletions
diff --git a/pysoundtouch/tools/Shifter.py b/pysoundtouch/tools/Shifter.py new file mode 100644 index 0000000..612fa7a --- /dev/null +++ b/pysoundtouch/tools/Shifter.py @@ -0,0 +1,238 @@ +from ReadAudio import AudioReader, ConvertReader +import soundtouch, wave, audioop, math +from array import array + +class Shifter: + @staticmethod + def shift_chunk(chunk, sampling_rate, channels, shift): + """Shift the pitch of a chunk of audio up or down + Width must be 2.""" + st = soundtouch.SoundTouch(sampling_rate, channels) + st.set_pitch_shift(shift) + + ii = 0 + resstr = "" + while ii + 4608 < len(chunk): + st.put_samples(chunk[ii:ii+4608].tostring()) + ii += 4608 + while st.ready_count() > 0: + resstr += st.get_samples(4608) + + st.put_samples(chunk[ii:]) + while st.ready_count() > 0: + resstr += st.get_samples(11025) + + resstr += Shifter.get_flush(st, channels, len(chunk) - len(resstr) / 2) + + del st + + result = array(chunk.typecode) + result.fromstring(resstr) + + return result + + @staticmethod + def many_shift_chunk(chunk, sampling_rate, channels, shifts): + """Produce harmonies by shifting a chunk of audio more than once and combining them.""" + shifteds = [] + maxlen = 0 + for jj in xrange(len(shifts)): + if not shifts[jj]: + shifted = chunk + else: + shifted = Shifter.shift_chunk(chunk, sampling_rate, channels, shifts[jj]) + + shifteds.append(shifted) + maxlen = max(maxlen, len(shifted)) + + if len(shifteds) > 1: + newchunk = [0] * maxlen + for ii in xrange(maxlen): + count = 0 + for jj in xrange(len(shifteds)): + if len(shifteds[jj]) > ii: + newchunk[ii] += shifteds[jj][ii] + count += 1 + newchunk[ii] /= count + + result = array(chunk.typecode) + result.fromlist(newchunk) + return result + else: + return shifteds[0] + + @staticmethod + def raw_shift_reader(srcpath, dstpath, shift): + """Shift an entire file up or down""" + # Open the file and convert it to have SoundTouch's required 2-byte samples + reader = AudioReader.open(srcpath) + reader2 = ConvertReader(reader, set_raw_width=2) + + # Create the SoundTouch object and set the given shift + st = soundtouch.SoundTouch(reader2.sampling_rate(), reader2.channels()) + st.set_pitch_shift(shift) + + # Create the .WAV file to write the result to + writer = wave.open(dstpath, 'w') + writer.setnchannels(reader2.channels()) + writer.setframerate(reader2.sampling_rate()) + writer.setsampwidth(reader2.raw_width()) + + # Read values and feed them into SoundTouch + while True: + data = reader2.raw_read() + if not data: + break + + print len(data) + st.put_samples(data) + + while st.ready_count() > 0: + writer.writeframes(st.get_samples(11025)) + + # Flush any remaining values + writer.writeframes(Shifter.get_flush(st, reader2.channels())) + + # Clean up + writer.close() + reader2.close() + + @staticmethod + def get_flush(st, channels, fade=0): + """Like soundtouch's flush, don't require that all data comes through, just any. + If fade > 0, only allow [fade] samples, and linearly scale volume to 0 over that length""" + + waiting = st.waiting_count() + ready = st.ready_count() + result = "" + + silence = array('h', [0] * 64) + + while st.ready_count() == ready: + st.put_samples(silence) + + while st.ready_count() > 0: + result += st.get_samples(11025) + + st.clear() + + if len(result) > 2 * channels * waiting: + result = result[0:(2 * channels * waiting)] + + fade = min(fade, len(result) / 2) + if fade > 0: + resultstring = "" + for ii in xrange(fade / channels): + i0 = ii * 2*channels + i1 = (ii+1) * 2*channels + resultstring += audioop.mul(result[i0:i1], 2, 1 - float(ii) / (fade / channels)) + result = resultstring + + return result + + @staticmethod + def bpm_detect_file(fullpath): + """Detect the beat from an entire file""" + reader = AudioReader.open(fullpath) + reader2 = ConvertReader(reader, set_raw_width=2) + + bd = soundtouch.BPMDetect(reader2.sampling_rate(), reader2.channels()) + + while True: + data = reader2.raw_read() + if not data: + break + + bd.put_samples(data) + + reader2.close() + + return bd.get_bpm() + + @staticmethod + def echocancel(outputdata, inputdata): + """Try to identify an echo and remove it. + Should contain 2-byte samples""" + pos = audioop.findmax(outputdata, 800) + out_test = outputdata[pos*2:] + in_test = inputdata[pos*2:] + ipos, factor = audioop.findfit(in_test, out_test) + factor = audioop.findfactor(in_test[ipos*2:ipos*2+len(out_test)], out_test) + prefill = '\0'*(pos+ipos)*2 + postfill = '\0'*(len(inputdata) - len(prefill) - len(outputdata)) + outputdata = prefill + audioop.mul(outputdata, 2, -factor) + postfill + return audioop.add(inputdata, outputdata, 2) + + @staticmethod + def beats_to_ms(bpm, beats): + """Convert from bpm at a given beat rate to ms between beats.""" + return 60 * 1000 * beats / bpm + + @staticmethod + def find_division_start(fullpath, bpm, beats_per): + """Identify the start of the beats, by finding segments that fit together""" + reader = AudioReader.open(fullpath) + + # This doesn't find the exact time of the max, but don't need it. + max_value = 0 + max_time = 0 + while True: + data = reader.raw_read() + if data is None: + break + + data_max = audioop.max(data, reader.raw_width()) + if data_max > max_value: + max_value = data_max + max_time = reader.current_time() + + before = max_time - Shifter.beats_to_ms(bpm, beats_per) + after = max_time + 2 * Shifter.beats_to_ms(bpm, beats_per) + if before < 0: + after += -before + before = 0 + if after > reader.duration(): + before -= after - reader.duration() + after = reader.duration() + if before < 0: + if beats_per < 2: + raise RuntimeError('This audio file is too short to be divided by beats.') + else: + reader.close() + return Shifting.find_division_start(filepath, bpm, int(beats_per / 2)) + + reader.seek_time(0) + reader2 = ConvertReader(reader, set_raw_width=2, set_channels=1) + region = reader2.raw_random_read(before, after) + + # both in bytes + raw_length = 2 * int(len(region) / 6) + beat_length = int(2 * Shifter.beats_to_ms(bpm, 1) * reader2.sampling_rate() / 1000.0) + + print "Around max: " + str(before) + " - " + str(after) + ": " + str(raw_length) + + min_factor = 0 + min_ii = 0 + # First determine time within a beat + for ii in xrange(beat_length / 200): + factor = audioop.findfactor(region[200*ii:200*ii+raw_length], region[200*ii+raw_length:200*ii+2*raw_length]) + if factor < min_factor: + print "Samp: At " + str(ii) + " " + str(factor) + min_factor = factor + min_ii = ii + + # Second, determine which beat to use + min_factor = 0 + min_jj = 0 + for jj in xrange(beats_per): + factor = audioop.findfactor(region[jj*beat_length+200*min_ii:jj*beat_length+200*min_ii+raw_length], region[jj*beat_length+200*min_ii+raw_length:jj*beat_length+200*min_ii+2*raw_length]) + print "Beat: At " + str(jj) + " " + str(factor) + if factor < min_factor: + min_factor = factor + min_jj = jj + + print "Best: Beat: " + str(min_jj) + ", Samp: " + str(100*min_ii) + start_time = before + (min_jj*beat_length*2 + 100*min_ii) * 1000.0 / reader2.sampling_rate() + reader2.close() + + return math.fmod(start_time, Shifter.beats_to_ms(bpm, beats_per)) |
