summaryrefslogtreecommitdiff
path: root/pysoundtouch/tools
diff options
context:
space:
mode:
authoryo mama <pepper@scannerjammer.com>2015-06-19 16:24:27 -0400
committeryo mama <pepper@scannerjammer.com>2015-06-19 16:24:27 -0400
commit8adfb3bd99b4dcff2459756af090a640fd7a4b4a (patch)
treec1e6adddda335f4d36a98039ccc5ac867ae7296d /pysoundtouch/tools
clone
Diffstat (limited to 'pysoundtouch/tools')
-rw-r--r--pysoundtouch/tools/ReadAudio.py616
-rw-r--r--pysoundtouch/tools/Shifter.py238
2 files changed, 854 insertions, 0 deletions
diff --git a/pysoundtouch/tools/ReadAudio.py b/pysoundtouch/tools/ReadAudio.py
new file mode 100644
index 0000000..62fc078
--- /dev/null
+++ b/pysoundtouch/tools/ReadAudio.py
@@ -0,0 +1,616 @@
+import mad, wave, aifc, sunau, time
+import Image, ImageDraw, math
+from array import array
+import audioop
+
+### Abstract AudioReader class
+class AudioReader:
+ @staticmethod
+ def open(filepath):
+ """Tries to determine the format of the file, and open it with an appropriate AudioReader subclass."""
+ reader = AudioReader.reader(filepath)
+ if not reader:
+ return None
+ return reader(filepath)
+
+ @staticmethod
+ def reader(filepath):
+ """Tries to determine the format of the file and returns an appropriate AudioReader subclass."""
+ filelow = filepath.lower()
+ if filelow.endswith('.mp3'):
+ return MP3Reader
+ if filelow.endswith('.wav') or filelow.endswith('.aif') or filelow.endswith('.aiff') or filelow.endswith('.au'):
+ return PCMReader
+ return None
+
+ # All AudioReader objects keep track of end-of-file flags and if there are leftovers from a read operation
+ def __init__(self, filepath):
+ self.filepath = filepath
+ self.eof = False
+ self.leftovers = [] # leftovers from random_read/continue_read
+
+ # Call the close function on deallocation
+ def __del__(self):
+ try:
+ self.close()
+ except:
+ pass
+
+ # OVERRIDE REQUIRED
+ def sampling_rate(self):
+ """Return the samples (frames) per second"""
+ return 0
+
+ # OVERRIDE REQUIRED
+ def duration(self):
+ """Return the duration in ms"""
+ return 0
+
+ # OVERRIDE REQUIRED
+ def current_time(self):
+ """Return the current time in ms"""
+ return 0
+
+ # OVERRIDE REQUIRED
+ def seek_time(self, time):
+ """Set the read pointer to the specified time (in ms)"""
+ pass
+
+ # OVERRIDE REQUIRED
+ def raw_width(self):
+ """Return the width in bytes of raw samples"""
+ pass
+
+ # OVERRIDE REQUIRED
+ def raw_read(self):
+ """Return some amount of data as a raw audio string"""
+ pass
+
+ def has_unsigned_singles(self):
+ """Is the raw data when this has a width of 1 stored in unsigned bytes (but not for higher widths)"""
+ return False
+
+ # OVERRIDE REQUIRED
+ def read(self):
+ """Return some number of frames of an channel-interleaved array (len = NxC) of the appropriate sample depth"""
+ pass
+
+ def close(self):
+ """Perform any necessary cleanup on deallocation."""
+ pass
+
+ def random_read(self, start, end, debugs=None):
+ """Return the frames between start and end"""
+ if self.current_time() != start:
+ self.seek_time(start)
+ lenout = int((end - start) * self.sampling_rate() / 1000.0) * self.channels()
+ return self.length_read(lenout, debugs)
+
+ def continue_read(self, end, debugs=None):
+ """Continue reading from the current read head."""
+ if debugs is not None:
+ debugs.append("Continue from " + str(len(self.leftovers)) + " + " + str(self.current_time()) + " to " + str(end))
+
+ # First take any samples from leftovers
+ if self.leftovers:
+ leftovers = self.leftovers
+ self.leftovers = []
+
+ lenout = int((end - (self.current_time() - (len(leftovers) / self.channels()) * 1000.0 / self.sampling_rate())) * self.sampling_rate() / 1000.0) * self.channels() - len(leftovers)
+
+ after = self.length_read(lenout, debugs)
+ result = array(after.typecode)
+ result.extend(leftovers)
+ result.extend(after)
+
+ return result
+ else:
+ # Call random_read as necessary
+ return self.random_read(self.current_time(), end, debugs)
+
+ def length_read(self, lenout, debugs=None):
+ """Read a given number of samples, by repeated calls to read()."""
+ result = self.read()
+ if result is None:
+ return None
+ while len(result) < lenout:
+ data = self.read()
+ if data is None:
+ break
+ result.extend(data)
+
+ # Put any extra samples in leftovers
+ if len(result) > lenout:
+ self.leftovers = result[lenout:]
+ result = result[:lenout]
+ else:
+ self.leftovers = []
+
+ if debugs is not None:
+ debugs.append("length_read: Got " + str(lenout) + " at " + str(self.current_time()))
+
+ return result
+
+ def raw_random_read(self, start, end):
+ """Return the raw samples between start and end
+ XXX: Consider converting to random_read-style length-based limiter"""
+ self.seek_time(start)
+ result = self.raw_read()
+ if result is None:
+ return None
+ before = self.current_time()
+ if before > end:
+ fraction = ((end - start) / (before - start)) / (self.raw_width() * self.channels())
+ result = result[:(self.raw_width() * self.channels()) * int(fraction * len(result))]
+ return result
+
+ while before < end:
+ data = self.raw_read()
+ if data is None:
+ break
+ after = self.current_time()
+ if after > end:
+ fraction = ((end - before) / (after - before)) / (self.raw_width() * self.channels())
+ data = data[:(self.raw_width() * self.channels()) * int(fraction * len(data))]
+
+ result += data
+ before = after
+
+ return result
+
+ def audio_to_image(self, filepath, width, height, divisor=0, dividers=None, start=0, end=None):
+ """Construct a graph of the samples and save to filepath."""
+ if (end is None):
+ end = self.duration()
+
+ if (start > 0):
+ self.seek_time(start)
+ lastRecorded = self.current_time()
+ else:
+ self.seek_time(0)
+ lastRecorded = start
+
+ time0 = time.clock()
+
+ ticksPerPixel = (end - start) / float(width)
+ maxEnergy = 0
+ pointsAbove = [0, height/2] # Use for left channel
+ pointsBelow = [] # Use for right channel
+ while lastRecorded < 0:
+ pointsAbove.append(len(pointsAbove) / 2)
+ pointsAbove.append(height / 2 + 1)
+ pointsBelow.append(height / 2)
+ pointsBelow.append(len(pointsBelow) / 2)
+ lastRecorded += ticksPerPixel
+
+ while time.clock() - time0 < 10 and self.current_time() < end:
+ data = self.read()
+ if data is None:
+ break
+ if divisor == 0:
+ divisor = pow(256, data.itemsize) / (2*math.sqrt(2))
+
+ startLoop = lastRecorded
+ while self.current_time() > lastRecorded + ticksPerPixel and lastRecorded < end:
+ pixel = len(pointsAbove) / 2
+ segment = data[(self.channels() * int((lastRecorded - startLoop) * self.sampling_rate() / 1000.0)):(self.channels() * int((lastRecorded - startLoop + ticksPerPixel) * self.sampling_rate() / 1000.0))]
+ energyLeft = 0
+ energyRight = 0
+ for ii in range(len(segment) / self.channels()):
+ energyLeft += abs(segment[ii * self.channels()])
+ if self.channels() == 1:
+ energyRight += abs(segment[ii])
+ else:
+ energyRight += abs(segment[ii * self.channels() + 1])
+
+ if len(segment) > 0:
+ energyLeft *= float(self.channels()) / len(segment)
+ energyRight *= float(self.channels()) / len(segment)
+
+ if energyLeft / divisor > .5:
+ divisor = 2 * energyLeft
+ if energyRight / divisor > .5:
+ divisor = 2 * energyRight
+ maxEnergy = max(maxEnergy, energyLeft, energyRight)
+
+ pointsAbove.append(len(pointsAbove) / 2)
+ pointsAbove.append(height / 2 - height * energyLeft / divisor)
+ pointsBelow.append(height / 2 + height * energyRight / divisor)
+ pointsBelow.append(len(pointsBelow) / 2)
+ lastRecorded += ticksPerPixel
+ lastRecorded = (len(pointsAbove) / 2) * ticksPerPixel + start
+
+ if maxEnergy < divisor / 3 and maxEnergy > 0:
+ # Try again, with a lower divisor
+ reader = AudioReader.open(self.filepath)
+ if reader:
+ return reader.audio_to_image(filepath, width, height, divisor=2 * maxEnergy, dividers=dividers, start=start, end=end)
+
+ pointsAbove.append(width)
+ pointsAbove.append(height / 2)
+ image = Image.new("RGB", (width, height), "Black")
+ draw = ImageDraw.Draw(image)
+ pointsBelow.reverse()
+ pointsAbove.extend(pointsBelow)
+ draw.polygon(pointsAbove, fill="Blue")
+
+ print lastRecorded
+
+ if dividers:
+ for ii in xrange(len(dividers)):
+ draw.line([(width * dividers[ii] * 1000.0 / self.duration(), 0),
+ (width * dividers[ii] * 1000.0 / self.duration(), height)], fill="Red")
+
+ del draw
+
+ print(filepath)
+ out = open(filepath, "w")
+ image.save(out, "PNG")
+ return maxEnergy
+
+### Reader of MP3 files
+class MP3Reader(AudioReader):
+ def __init__(self, filepath):
+ # Use mad to read the MP3 file.
+ AudioReader.__init__(self, filepath)
+ self.mf = mad.MadFile(filepath)
+
+ def channels(self):
+ # mad always returns a dual-channel stream
+ return 2
+
+ def sampling_rate(self):
+ return self.mf.samplerate()
+
+ def duration(self):
+ return self.mf.total_time()
+
+ def current_time(self):
+ return self.mf.current_time()
+
+ def seek_time(self, time):
+ """Set the read pointer to the specified time (in ms)"""
+ self.mf.seek_time(time)
+
+ def raw_width(self):
+ """Return the width in bytes of raw samples"""
+ return 2
+
+ def raw_read(self):
+ """Return some amount of data as a raw audio string"""
+ buf = self.mf.read()
+ if buf is None:
+ self.eof = True
+ return None
+
+ return buf
+
+ def read(self):
+ buf = self.raw_read()
+ if not buf:
+ return None
+
+ short_array = array('h')
+ short_array.fromstring(buf)
+ return short_array
+
+ def close(self):
+ del self.mf
+
+### Reader for a simple PCM-based file format
+class PCMReader(AudioReader):
+ def __init__(self, filepath):
+ AudioReader.__init__(self, filepath)
+ if filepath.lower().endswith(".aif") or filepath.lower().endswith(".aiff"):
+ self.wf = aiff.open(self.filepath)
+ elif filepath.lower().endswith('.au'):
+ self.wf = sunau.open(self.filepath)
+ else:
+ self.wf = wave.open(self.filepath)
+ self.framesread = 0
+ self.frames_per_read = self.wf.getframerate() / 10
+
+ def channels(self):
+ return self.wf.getnchannels()
+
+ def sampling_rate(self):
+ return self.wf.getframerate()
+
+ def duration(self):
+ return round((1000.0 * self.wf.getnframes()) / self.wf.getframerate())
+
+ def current_time(self):
+ return round((1000.0 * self.framesread) / self.wf.getframerate())
+
+ def seek_time(self, time):
+ """Set the read pointer to the specified time (in ms)"""
+ if time == 0:
+ self.wf.rewind()
+ self.framesread = 0
+ return
+
+ # Check the step size
+ self.wf.rewind()
+ zero = self.wf.tell()
+ buf = self.wf.readframes(1)
+ one = self.wf.tell()
+
+ # We just have to guess, and hope we're right (no way to check!)
+ gotoframe = int(time * self.wf.getframerate() / 1000.0)
+ if gotoframe > self.wf.getnframes():
+ raise ValueError(str(time) + " is beyond " + str(self.duration()))
+
+ gotopos = zero + gotoframe * one
+ try:
+ self.wf.setpos(gotopos)
+ except:
+ raise ValueError("Cannot go to " + str(time) + " with " + str(zero) + ":" + str(one))
+ self.framesread = gotoframe
+
+ def raw_width(self):
+ """Return the width in bytes of raw samples"""
+ return self.wf.getsampwidth()
+
+ def raw_read(self):
+ """Return some amount of data as a raw audio string"""
+ buf = self.wf.readframes(self.frames_per_read)
+ if not buf:
+ self.eof = True
+ return None
+
+ self.framesread += self.frames_per_read
+
+ return buf
+
+ def has_unsigned_singles(self):
+ """Is the raw data when this has a width of 1 stored in unsigned bytes (but not for higher widths)"""
+ return self.filepath.lower().endswith(".wav")
+
+ def read(self):
+ buf = self.raw_read()
+ if not buf:
+ return None
+
+ if self.wf.getsampwidth() == 1:
+ data_array = array('b')
+ elif self.wf.getsampwidth() == 2:
+ data_array = array('h')
+ else:
+ data_array = array('i')
+ data_array.fromstring(buf)
+ return data_array
+
+ def close(self):
+ self.wf.close()
+
+### Convert the samples from one AudioReader into another format
+class ConvertReader(AudioReader):
+ def __init__(self, source, set_channels=None, set_sampling_rate=None, set_raw_width=None):
+ """Constructor:
+ source is an AudioReader; give set_channels, set_sampling_rate, and set_raw_width based on what you want to change."""
+ AudioReader.__init__(self, source.filepath)
+ self.source = source
+ self.set_channels = set_channels
+ self.set_sampling_rate = set_sampling_rate
+ self.set_raw_width = set_raw_width
+ self.ratecv_state = None
+
+ def channels(self):
+ return self.set_channels or self.source.channels()
+
+ def sampling_rate(self):
+ return self.set_sampling_rate or self.source.sampling_rate()
+
+ def duration(self):
+ return self.source.duration()
+
+ def current_time(self):
+ return self.source.current_time()
+
+ def seek_time(self, time):
+ """Set the read pointer to the specified time (in ms)"""
+ self.source.seek_time(time)
+
+ def raw_width(self):
+ """Return the width in bytes of raw samples"""
+ return self.set_raw_width or self.source.raw_width()
+
+ def raw_read(self):
+ """Return some amount of data as a raw audio string"""
+ buf = self.source.raw_read()
+ if buf is None:
+ self.eof = True
+ return None
+
+ # Convert channels as needed
+ if self.set_channels and self.source.channels() != self.set_channels:
+ if self.set_channels == 1:
+ buf = audioop.tomono(buf, self.source.raw_width(), .5, .5)
+ else:
+ buf = audioop.tostereo(buf, self.source.raw_width(), 1, 1)
+
+ # Convert sampling rate as needed
+ if self.set_sampling_rate and self.source.sampling_rate() != self.set_sampling_rate:
+ (buf, self.ratecv_state) = audioop.ratecv(buf, self.source.raw_width(), self.channels(), self.source.sampling_rate(), self.set_sampling_rate, self.ratecv_state)
+
+ if self.set_raw_width and self.source.raw_width() != self.set_raw_width:
+ if self.source.raw_width() == 1 and self.source.has_unsigned_singles():
+ buf = audioop.bias(buf, 1, -128)
+ buf = audioop.lin2lin(buf, self.source.raw_width(), self.set_raw_width)
+ if self.set_raw_width == 1 and self.source.has_unsigned_singles():
+ buf = audioop.bias(buf, 1, 128)
+
+ return buf
+
+ def has_unsigned_singles(self):
+ """Is the raw data when this has a width of 1 stored in unsigned bytes (but not for higher widths)"""
+ return self.source.has_unsigned_singles()
+
+ def read(self):
+ # raw_read handles all basic conversion
+ buf = self.raw_read()
+
+ # Convert width as needed
+ if self.raw_width() == 1:
+ data_array = array('b')
+ elif self.raw_width() == 2:
+ data_array = array('h')
+ else:
+ data_array = array('i')
+
+ data_array.fromstring(buf)
+ return data_array
+
+ def close(self):
+ self.source.close()
+
+### Scale the audio (volume) in an AudioReader
+class ScaleReader(AudioReader):
+ def __init__(self, source, scale=1.0, bias=0):
+ """Constructor:
+ source is an AudioReader; scale is > 1 to increase volume; bias is inaudible but can be changed to remove clicks."""
+ AudioReader.__init__(self, source.filepath)
+ self.source = source
+ self.scale = scale
+ self.bias = bias
+
+ def channels(self):
+ return self.source.channels()
+
+ def sampling_rate(self):
+ return self.source.sampling_rate()
+
+ def duration(self):
+ return self.source.duration()
+
+ def current_time(self):
+ return self.source.current_time()
+
+ def seek_time(self, time):
+ """Set the read pointer to the specified time (in ms)"""
+ self.source.seek_time(time)
+
+ def raw_width(self):
+ """Return the width in bytes of raw samples"""
+ return self.source.raw_width()
+
+ def raw_read(self):
+ """Return some amount of data as a raw audio string"""
+ buf = self.source.raw_read()
+ if buf is None:
+ self.eof = True
+ return None
+
+ # Perform the scaling and biasing
+ if self.scale != 1.0:
+ buf = audioop.mul(buf, self.source.raw_width(), self.scale)
+
+ if self.bias != 0:
+ buf = audioop.bias(buf, self.source.raw_width(), self.bias)
+
+ return buf
+
+ def has_unsigned_singles(self):
+ """Is the raw data when this has a width of 1 stored in unsigned bytes (but not for higher widths)"""
+ return self.source.has_unsigned_singles()
+
+ def read(self):
+ # raw_read performs the necessary changes
+ buf = self.raw_read()
+ if not buf:
+ return None
+
+ if self.raw_width() == 1:
+ data_array = array('b')
+ elif self.raw_width() == 2:
+ data_array = array('h')
+ else:
+ data_array = array('i')
+
+ data_array.fromstring(buf)
+ return data_array
+
+ def close(self):
+ self.source.close()
+
+### Concatenate two audio files
+class AppendReader(AudioReader):
+ def __init__(self, one_path, two_path):
+ """Constructor: give two paths to be opened and concatenated."""
+ AudioReader.__init__(self, one_path)
+ self.one_source = AudioReader.open(one_path)
+ # Convert the second file to be like the first
+ self.two_source = ConvertReader(AudioReader.open(two_path), one_source.channels(), one_source.sampling_rate(), one_source.raw_width())
+ self.current_time = 0
+
+ def channels(self):
+ return self.one_source.channels()
+
+ def sampling_rate(self):
+ return self.one_source.sampling_rate()
+
+ def duration(self):
+ return self.one_source.duration() + self.two_source.duration()
+
+ def current_time(self):
+ return self.current_time
+
+ def seek_time(self, time):
+ """Set the read pointer to the specified time (in ms)"""
+ # Seek to one file or the other
+ if time < self.one_source.duration():
+ self.one_source.seek_time(time)
+ else:
+ self.two_source.seek_time(time - self.one_source.duration())
+ self.current_time = time
+
+ def raw_width(self):
+ """Return the width in bytes of raw samples"""
+ return self.one_source.raw_width()
+
+ def raw_read(self):
+ """Return some amount of data as a raw audio string"""
+ if self.current_time < self.one_source.duration():
+ # Read from the first audio source
+ buf = self.one_source.raw_read()
+ if buf is None:
+ buf = self.two_source.raw_read()
+ if buf is None:
+ self.eof = True
+ return None
+
+ self.current_time = self.one_source.duration() + self.two_source.current_time()
+ return buf
+ else:
+ self.current_time = self.one_source.current_time()
+ return buf
+ else:
+ # Read from the second audio source
+ buf = self.two_source.raw_read()
+ if buf is None:
+ self.eof = True
+ return None
+
+ self.current_time = self.one_source.duration() + self.two_source.current_time()
+ return buf
+
+ def has_unsigned_singles(self):
+ """Is the raw data when this has a width of 1 stored in unsigned bytes (but not for higher widths)"""
+ return self.one_source.has_unsigned_singles()
+
+ def read(self):
+ buf = self.raw_read()
+
+ if self.one_source.raw_width() == 1:
+ data_array = array('b')
+ elif self.one_source.raw_width() == 2:
+ data_array = array('h')
+ else:
+ data_array = array('i')
+
+ data_array.fromstring(buf)
+ return data_array
+
+ def close(self):
+ self.one_source.close()
+ self.two_source.close()
diff --git a/pysoundtouch/tools/Shifter.py b/pysoundtouch/tools/Shifter.py
new file mode 100644
index 0000000..612fa7a
--- /dev/null
+++ b/pysoundtouch/tools/Shifter.py
@@ -0,0 +1,238 @@
+from ReadAudio import AudioReader, ConvertReader
+import soundtouch, wave, audioop, math
+from array import array
+
+class Shifter:
+ @staticmethod
+ def shift_chunk(chunk, sampling_rate, channels, shift):
+ """Shift the pitch of a chunk of audio up or down
+ Width must be 2."""
+ st = soundtouch.SoundTouch(sampling_rate, channels)
+ st.set_pitch_shift(shift)
+
+ ii = 0
+ resstr = ""
+ while ii + 4608 < len(chunk):
+ st.put_samples(chunk[ii:ii+4608].tostring())
+ ii += 4608
+ while st.ready_count() > 0:
+ resstr += st.get_samples(4608)
+
+ st.put_samples(chunk[ii:])
+ while st.ready_count() > 0:
+ resstr += st.get_samples(11025)
+
+ resstr += Shifter.get_flush(st, channels, len(chunk) - len(resstr) / 2)
+
+ del st
+
+ result = array(chunk.typecode)
+ result.fromstring(resstr)
+
+ return result
+
+ @staticmethod
+ def many_shift_chunk(chunk, sampling_rate, channels, shifts):
+ """Produce harmonies by shifting a chunk of audio more than once and combining them."""
+ shifteds = []
+ maxlen = 0
+ for jj in xrange(len(shifts)):
+ if not shifts[jj]:
+ shifted = chunk
+ else:
+ shifted = Shifter.shift_chunk(chunk, sampling_rate, channels, shifts[jj])
+
+ shifteds.append(shifted)
+ maxlen = max(maxlen, len(shifted))
+
+ if len(shifteds) > 1:
+ newchunk = [0] * maxlen
+ for ii in xrange(maxlen):
+ count = 0
+ for jj in xrange(len(shifteds)):
+ if len(shifteds[jj]) > ii:
+ newchunk[ii] += shifteds[jj][ii]
+ count += 1
+ newchunk[ii] /= count
+
+ result = array(chunk.typecode)
+ result.fromlist(newchunk)
+ return result
+ else:
+ return shifteds[0]
+
+ @staticmethod
+ def raw_shift_reader(srcpath, dstpath, shift):
+ """Shift an entire file up or down"""
+ # Open the file and convert it to have SoundTouch's required 2-byte samples
+ reader = AudioReader.open(srcpath)
+ reader2 = ConvertReader(reader, set_raw_width=2)
+
+ # Create the SoundTouch object and set the given shift
+ st = soundtouch.SoundTouch(reader2.sampling_rate(), reader2.channels())
+ st.set_pitch_shift(shift)
+
+ # Create the .WAV file to write the result to
+ writer = wave.open(dstpath, 'w')
+ writer.setnchannels(reader2.channels())
+ writer.setframerate(reader2.sampling_rate())
+ writer.setsampwidth(reader2.raw_width())
+
+ # Read values and feed them into SoundTouch
+ while True:
+ data = reader2.raw_read()
+ if not data:
+ break
+
+ print len(data)
+ st.put_samples(data)
+
+ while st.ready_count() > 0:
+ writer.writeframes(st.get_samples(11025))
+
+ # Flush any remaining values
+ writer.writeframes(Shifter.get_flush(st, reader2.channels()))
+
+ # Clean up
+ writer.close()
+ reader2.close()
+
+ @staticmethod
+ def get_flush(st, channels, fade=0):
+ """Like soundtouch's flush, don't require that all data comes through, just any.
+ If fade > 0, only allow [fade] samples, and linearly scale volume to 0 over that length"""
+
+ waiting = st.waiting_count()
+ ready = st.ready_count()
+ result = ""
+
+ silence = array('h', [0] * 64)
+
+ while st.ready_count() == ready:
+ st.put_samples(silence)
+
+ while st.ready_count() > 0:
+ result += st.get_samples(11025)
+
+ st.clear()
+
+ if len(result) > 2 * channels * waiting:
+ result = result[0:(2 * channels * waiting)]
+
+ fade = min(fade, len(result) / 2)
+ if fade > 0:
+ resultstring = ""
+ for ii in xrange(fade / channels):
+ i0 = ii * 2*channels
+ i1 = (ii+1) * 2*channels
+ resultstring += audioop.mul(result[i0:i1], 2, 1 - float(ii) / (fade / channels))
+ result = resultstring
+
+ return result
+
+ @staticmethod
+ def bpm_detect_file(fullpath):
+ """Detect the beat from an entire file"""
+ reader = AudioReader.open(fullpath)
+ reader2 = ConvertReader(reader, set_raw_width=2)
+
+ bd = soundtouch.BPMDetect(reader2.sampling_rate(), reader2.channels())
+
+ while True:
+ data = reader2.raw_read()
+ if not data:
+ break
+
+ bd.put_samples(data)
+
+ reader2.close()
+
+ return bd.get_bpm()
+
+ @staticmethod
+ def echocancel(outputdata, inputdata):
+ """Try to identify an echo and remove it.
+ Should contain 2-byte samples"""
+ pos = audioop.findmax(outputdata, 800)
+ out_test = outputdata[pos*2:]
+ in_test = inputdata[pos*2:]
+ ipos, factor = audioop.findfit(in_test, out_test)
+ factor = audioop.findfactor(in_test[ipos*2:ipos*2+len(out_test)], out_test)
+ prefill = '\0'*(pos+ipos)*2
+ postfill = '\0'*(len(inputdata) - len(prefill) - len(outputdata))
+ outputdata = prefill + audioop.mul(outputdata, 2, -factor) + postfill
+ return audioop.add(inputdata, outputdata, 2)
+
+ @staticmethod
+ def beats_to_ms(bpm, beats):
+ """Convert from bpm at a given beat rate to ms between beats."""
+ return 60 * 1000 * beats / bpm
+
+ @staticmethod
+ def find_division_start(fullpath, bpm, beats_per):
+ """Identify the start of the beats, by finding segments that fit together"""
+ reader = AudioReader.open(fullpath)
+
+ # This doesn't find the exact time of the max, but don't need it.
+ max_value = 0
+ max_time = 0
+ while True:
+ data = reader.raw_read()
+ if data is None:
+ break
+
+ data_max = audioop.max(data, reader.raw_width())
+ if data_max > max_value:
+ max_value = data_max
+ max_time = reader.current_time()
+
+ before = max_time - Shifter.beats_to_ms(bpm, beats_per)
+ after = max_time + 2 * Shifter.beats_to_ms(bpm, beats_per)
+ if before < 0:
+ after += -before
+ before = 0
+ if after > reader.duration():
+ before -= after - reader.duration()
+ after = reader.duration()
+ if before < 0:
+ if beats_per < 2:
+ raise RuntimeError('This audio file is too short to be divided by beats.')
+ else:
+ reader.close()
+ return Shifting.find_division_start(filepath, bpm, int(beats_per / 2))
+
+ reader.seek_time(0)
+ reader2 = ConvertReader(reader, set_raw_width=2, set_channels=1)
+ region = reader2.raw_random_read(before, after)
+
+ # both in bytes
+ raw_length = 2 * int(len(region) / 6)
+ beat_length = int(2 * Shifter.beats_to_ms(bpm, 1) * reader2.sampling_rate() / 1000.0)
+
+ print "Around max: " + str(before) + " - " + str(after) + ": " + str(raw_length)
+
+ min_factor = 0
+ min_ii = 0
+ # First determine time within a beat
+ for ii in xrange(beat_length / 200):
+ factor = audioop.findfactor(region[200*ii:200*ii+raw_length], region[200*ii+raw_length:200*ii+2*raw_length])
+ if factor < min_factor:
+ print "Samp: At " + str(ii) + " " + str(factor)
+ min_factor = factor
+ min_ii = ii
+
+ # Second, determine which beat to use
+ min_factor = 0
+ min_jj = 0
+ for jj in xrange(beats_per):
+ factor = audioop.findfactor(region[jj*beat_length+200*min_ii:jj*beat_length+200*min_ii+raw_length], region[jj*beat_length+200*min_ii+raw_length:jj*beat_length+200*min_ii+2*raw_length])
+ print "Beat: At " + str(jj) + " " + str(factor)
+ if factor < min_factor:
+ min_factor = factor
+ min_jj = jj
+
+ print "Best: Beat: " + str(min_jj) + ", Samp: " + str(100*min_ii)
+ start_time = before + (min_jj*beat_length*2 + 100*min_ii) * 1000.0 / reader2.sampling_rate()
+ reader2.close()
+
+ return math.fmod(start_time, Shifter.beats_to_ms(bpm, beats_per))