summaryrefslogtreecommitdiff
path: root/pysoundtouch/tools/ReadAudio.py
diff options
context:
space:
mode:
Diffstat (limited to 'pysoundtouch/tools/ReadAudio.py')
-rw-r--r--pysoundtouch/tools/ReadAudio.py616
1 files changed, 616 insertions, 0 deletions
diff --git a/pysoundtouch/tools/ReadAudio.py b/pysoundtouch/tools/ReadAudio.py
new file mode 100644
index 0000000..62fc078
--- /dev/null
+++ b/pysoundtouch/tools/ReadAudio.py
@@ -0,0 +1,616 @@
+import mad, wave, aifc, sunau, time
+import Image, ImageDraw, math
+from array import array
+import audioop
+
+### Abstract AudioReader class
+class AudioReader:
+ @staticmethod
+ def open(filepath):
+ """Tries to determine the format of the file, and open it with an appropriate AudioReader subclass."""
+ reader = AudioReader.reader(filepath)
+ if not reader:
+ return None
+ return reader(filepath)
+
+ @staticmethod
+ def reader(filepath):
+ """Tries to determine the format of the file and returns an appropriate AudioReader subclass."""
+ filelow = filepath.lower()
+ if filelow.endswith('.mp3'):
+ return MP3Reader
+ if filelow.endswith('.wav') or filelow.endswith('.aif') or filelow.endswith('.aiff') or filelow.endswith('.au'):
+ return PCMReader
+ return None
+
+ # All AudioReader objects keep track of end-of-file flags and if there are leftovers from a read operation
+ def __init__(self, filepath):
+ self.filepath = filepath
+ self.eof = False
+ self.leftovers = [] # leftovers from random_read/continue_read
+
+ # Call the close function on deallocation
+ def __del__(self):
+ try:
+ self.close()
+ except:
+ pass
+
+ # OVERRIDE REQUIRED
+ def sampling_rate(self):
+ """Return the samples (frames) per second"""
+ return 0
+
+ # OVERRIDE REQUIRED
+ def duration(self):
+ """Return the duration in ms"""
+ return 0
+
+ # OVERRIDE REQUIRED
+ def current_time(self):
+ """Return the current time in ms"""
+ return 0
+
+ # OVERRIDE REQUIRED
+ def seek_time(self, time):
+ """Set the read pointer to the specified time (in ms)"""
+ pass
+
+ # OVERRIDE REQUIRED
+ def raw_width(self):
+ """Return the width in bytes of raw samples"""
+ pass
+
+ # OVERRIDE REQUIRED
+ def raw_read(self):
+ """Return some amount of data as a raw audio string"""
+ pass
+
+ def has_unsigned_singles(self):
+ """Is the raw data when this has a width of 1 stored in unsigned bytes (but not for higher widths)"""
+ return False
+
+ # OVERRIDE REQUIRED
+ def read(self):
+ """Return some number of frames of an channel-interleaved array (len = NxC) of the appropriate sample depth"""
+ pass
+
+ def close(self):
+ """Perform any necessary cleanup on deallocation."""
+ pass
+
+ def random_read(self, start, end, debugs=None):
+ """Return the frames between start and end"""
+ if self.current_time() != start:
+ self.seek_time(start)
+ lenout = int((end - start) * self.sampling_rate() / 1000.0) * self.channels()
+ return self.length_read(lenout, debugs)
+
+ def continue_read(self, end, debugs=None):
+ """Continue reading from the current read head."""
+ if debugs is not None:
+ debugs.append("Continue from " + str(len(self.leftovers)) + " + " + str(self.current_time()) + " to " + str(end))
+
+ # First take any samples from leftovers
+ if self.leftovers:
+ leftovers = self.leftovers
+ self.leftovers = []
+
+ lenout = int((end - (self.current_time() - (len(leftovers) / self.channels()) * 1000.0 / self.sampling_rate())) * self.sampling_rate() / 1000.0) * self.channels() - len(leftovers)
+
+ after = self.length_read(lenout, debugs)
+ result = array(after.typecode)
+ result.extend(leftovers)
+ result.extend(after)
+
+ return result
+ else:
+ # Call random_read as necessary
+ return self.random_read(self.current_time(), end, debugs)
+
+ def length_read(self, lenout, debugs=None):
+ """Read a given number of samples, by repeated calls to read()."""
+ result = self.read()
+ if result is None:
+ return None
+ while len(result) < lenout:
+ data = self.read()
+ if data is None:
+ break
+ result.extend(data)
+
+ # Put any extra samples in leftovers
+ if len(result) > lenout:
+ self.leftovers = result[lenout:]
+ result = result[:lenout]
+ else:
+ self.leftovers = []
+
+ if debugs is not None:
+ debugs.append("length_read: Got " + str(lenout) + " at " + str(self.current_time()))
+
+ return result
+
+ def raw_random_read(self, start, end):
+ """Return the raw samples between start and end
+ XXX: Consider converting to random_read-style length-based limiter"""
+ self.seek_time(start)
+ result = self.raw_read()
+ if result is None:
+ return None
+ before = self.current_time()
+ if before > end:
+ fraction = ((end - start) / (before - start)) / (self.raw_width() * self.channels())
+ result = result[:(self.raw_width() * self.channels()) * int(fraction * len(result))]
+ return result
+
+ while before < end:
+ data = self.raw_read()
+ if data is None:
+ break
+ after = self.current_time()
+ if after > end:
+ fraction = ((end - before) / (after - before)) / (self.raw_width() * self.channels())
+ data = data[:(self.raw_width() * self.channels()) * int(fraction * len(data))]
+
+ result += data
+ before = after
+
+ return result
+
+ def audio_to_image(self, filepath, width, height, divisor=0, dividers=None, start=0, end=None):
+ """Construct a graph of the samples and save to filepath."""
+ if (end is None):
+ end = self.duration()
+
+ if (start > 0):
+ self.seek_time(start)
+ lastRecorded = self.current_time()
+ else:
+ self.seek_time(0)
+ lastRecorded = start
+
+ time0 = time.clock()
+
+ ticksPerPixel = (end - start) / float(width)
+ maxEnergy = 0
+ pointsAbove = [0, height/2] # Use for left channel
+ pointsBelow = [] # Use for right channel
+ while lastRecorded < 0:
+ pointsAbove.append(len(pointsAbove) / 2)
+ pointsAbove.append(height / 2 + 1)
+ pointsBelow.append(height / 2)
+ pointsBelow.append(len(pointsBelow) / 2)
+ lastRecorded += ticksPerPixel
+
+ while time.clock() - time0 < 10 and self.current_time() < end:
+ data = self.read()
+ if data is None:
+ break
+ if divisor == 0:
+ divisor = pow(256, data.itemsize) / (2*math.sqrt(2))
+
+ startLoop = lastRecorded
+ while self.current_time() > lastRecorded + ticksPerPixel and lastRecorded < end:
+ pixel = len(pointsAbove) / 2
+ segment = data[(self.channels() * int((lastRecorded - startLoop) * self.sampling_rate() / 1000.0)):(self.channels() * int((lastRecorded - startLoop + ticksPerPixel) * self.sampling_rate() / 1000.0))]
+ energyLeft = 0
+ energyRight = 0
+ for ii in range(len(segment) / self.channels()):
+ energyLeft += abs(segment[ii * self.channels()])
+ if self.channels() == 1:
+ energyRight += abs(segment[ii])
+ else:
+ energyRight += abs(segment[ii * self.channels() + 1])
+
+ if len(segment) > 0:
+ energyLeft *= float(self.channels()) / len(segment)
+ energyRight *= float(self.channels()) / len(segment)
+
+ if energyLeft / divisor > .5:
+ divisor = 2 * energyLeft
+ if energyRight / divisor > .5:
+ divisor = 2 * energyRight
+ maxEnergy = max(maxEnergy, energyLeft, energyRight)
+
+ pointsAbove.append(len(pointsAbove) / 2)
+ pointsAbove.append(height / 2 - height * energyLeft / divisor)
+ pointsBelow.append(height / 2 + height * energyRight / divisor)
+ pointsBelow.append(len(pointsBelow) / 2)
+ lastRecorded += ticksPerPixel
+ lastRecorded = (len(pointsAbove) / 2) * ticksPerPixel + start
+
+ if maxEnergy < divisor / 3 and maxEnergy > 0:
+ # Try again, with a lower divisor
+ reader = AudioReader.open(self.filepath)
+ if reader:
+ return reader.audio_to_image(filepath, width, height, divisor=2 * maxEnergy, dividers=dividers, start=start, end=end)
+
+ pointsAbove.append(width)
+ pointsAbove.append(height / 2)
+ image = Image.new("RGB", (width, height), "Black")
+ draw = ImageDraw.Draw(image)
+ pointsBelow.reverse()
+ pointsAbove.extend(pointsBelow)
+ draw.polygon(pointsAbove, fill="Blue")
+
+ print lastRecorded
+
+ if dividers:
+ for ii in xrange(len(dividers)):
+ draw.line([(width * dividers[ii] * 1000.0 / self.duration(), 0),
+ (width * dividers[ii] * 1000.0 / self.duration(), height)], fill="Red")
+
+ del draw
+
+ print(filepath)
+ out = open(filepath, "w")
+ image.save(out, "PNG")
+ return maxEnergy
+
+### Reader of MP3 files
+class MP3Reader(AudioReader):
+ def __init__(self, filepath):
+ # Use mad to read the MP3 file.
+ AudioReader.__init__(self, filepath)
+ self.mf = mad.MadFile(filepath)
+
+ def channels(self):
+ # mad always returns a dual-channel stream
+ return 2
+
+ def sampling_rate(self):
+ return self.mf.samplerate()
+
+ def duration(self):
+ return self.mf.total_time()
+
+ def current_time(self):
+ return self.mf.current_time()
+
+ def seek_time(self, time):
+ """Set the read pointer to the specified time (in ms)"""
+ self.mf.seek_time(time)
+
+ def raw_width(self):
+ """Return the width in bytes of raw samples"""
+ return 2
+
+ def raw_read(self):
+ """Return some amount of data as a raw audio string"""
+ buf = self.mf.read()
+ if buf is None:
+ self.eof = True
+ return None
+
+ return buf
+
+ def read(self):
+ buf = self.raw_read()
+ if not buf:
+ return None
+
+ short_array = array('h')
+ short_array.fromstring(buf)
+ return short_array
+
+ def close(self):
+ del self.mf
+
+### Reader for a simple PCM-based file format
+class PCMReader(AudioReader):
+ def __init__(self, filepath):
+ AudioReader.__init__(self, filepath)
+ if filepath.lower().endswith(".aif") or filepath.lower().endswith(".aiff"):
+ self.wf = aiff.open(self.filepath)
+ elif filepath.lower().endswith('.au'):
+ self.wf = sunau.open(self.filepath)
+ else:
+ self.wf = wave.open(self.filepath)
+ self.framesread = 0
+ self.frames_per_read = self.wf.getframerate() / 10
+
+ def channels(self):
+ return self.wf.getnchannels()
+
+ def sampling_rate(self):
+ return self.wf.getframerate()
+
+ def duration(self):
+ return round((1000.0 * self.wf.getnframes()) / self.wf.getframerate())
+
+ def current_time(self):
+ return round((1000.0 * self.framesread) / self.wf.getframerate())
+
+ def seek_time(self, time):
+ """Set the read pointer to the specified time (in ms)"""
+ if time == 0:
+ self.wf.rewind()
+ self.framesread = 0
+ return
+
+ # Check the step size
+ self.wf.rewind()
+ zero = self.wf.tell()
+ buf = self.wf.readframes(1)
+ one = self.wf.tell()
+
+ # We just have to guess, and hope we're right (no way to check!)
+ gotoframe = int(time * self.wf.getframerate() / 1000.0)
+ if gotoframe > self.wf.getnframes():
+ raise ValueError(str(time) + " is beyond " + str(self.duration()))
+
+ gotopos = zero + gotoframe * one
+ try:
+ self.wf.setpos(gotopos)
+ except:
+ raise ValueError("Cannot go to " + str(time) + " with " + str(zero) + ":" + str(one))
+ self.framesread = gotoframe
+
+ def raw_width(self):
+ """Return the width in bytes of raw samples"""
+ return self.wf.getsampwidth()
+
+ def raw_read(self):
+ """Return some amount of data as a raw audio string"""
+ buf = self.wf.readframes(self.frames_per_read)
+ if not buf:
+ self.eof = True
+ return None
+
+ self.framesread += self.frames_per_read
+
+ return buf
+
+ def has_unsigned_singles(self):
+ """Is the raw data when this has a width of 1 stored in unsigned bytes (but not for higher widths)"""
+ return self.filepath.lower().endswith(".wav")
+
+ def read(self):
+ buf = self.raw_read()
+ if not buf:
+ return None
+
+ if self.wf.getsampwidth() == 1:
+ data_array = array('b')
+ elif self.wf.getsampwidth() == 2:
+ data_array = array('h')
+ else:
+ data_array = array('i')
+ data_array.fromstring(buf)
+ return data_array
+
+ def close(self):
+ self.wf.close()
+
+### Convert the samples from one AudioReader into another format
+class ConvertReader(AudioReader):
+ def __init__(self, source, set_channels=None, set_sampling_rate=None, set_raw_width=None):
+ """Constructor:
+ source is an AudioReader; give set_channels, set_sampling_rate, and set_raw_width based on what you want to change."""
+ AudioReader.__init__(self, source.filepath)
+ self.source = source
+ self.set_channels = set_channels
+ self.set_sampling_rate = set_sampling_rate
+ self.set_raw_width = set_raw_width
+ self.ratecv_state = None
+
+ def channels(self):
+ return self.set_channels or self.source.channels()
+
+ def sampling_rate(self):
+ return self.set_sampling_rate or self.source.sampling_rate()
+
+ def duration(self):
+ return self.source.duration()
+
+ def current_time(self):
+ return self.source.current_time()
+
+ def seek_time(self, time):
+ """Set the read pointer to the specified time (in ms)"""
+ self.source.seek_time(time)
+
+ def raw_width(self):
+ """Return the width in bytes of raw samples"""
+ return self.set_raw_width or self.source.raw_width()
+
+ def raw_read(self):
+ """Return some amount of data as a raw audio string"""
+ buf = self.source.raw_read()
+ if buf is None:
+ self.eof = True
+ return None
+
+ # Convert channels as needed
+ if self.set_channels and self.source.channels() != self.set_channels:
+ if self.set_channels == 1:
+ buf = audioop.tomono(buf, self.source.raw_width(), .5, .5)
+ else:
+ buf = audioop.tostereo(buf, self.source.raw_width(), 1, 1)
+
+ # Convert sampling rate as needed
+ if self.set_sampling_rate and self.source.sampling_rate() != self.set_sampling_rate:
+ (buf, self.ratecv_state) = audioop.ratecv(buf, self.source.raw_width(), self.channels(), self.source.sampling_rate(), self.set_sampling_rate, self.ratecv_state)
+
+ if self.set_raw_width and self.source.raw_width() != self.set_raw_width:
+ if self.source.raw_width() == 1 and self.source.has_unsigned_singles():
+ buf = audioop.bias(buf, 1, -128)
+ buf = audioop.lin2lin(buf, self.source.raw_width(), self.set_raw_width)
+ if self.set_raw_width == 1 and self.source.has_unsigned_singles():
+ buf = audioop.bias(buf, 1, 128)
+
+ return buf
+
+ def has_unsigned_singles(self):
+ """Is the raw data when this has a width of 1 stored in unsigned bytes (but not for higher widths)"""
+ return self.source.has_unsigned_singles()
+
+ def read(self):
+ # raw_read handles all basic conversion
+ buf = self.raw_read()
+
+ # Convert width as needed
+ if self.raw_width() == 1:
+ data_array = array('b')
+ elif self.raw_width() == 2:
+ data_array = array('h')
+ else:
+ data_array = array('i')
+
+ data_array.fromstring(buf)
+ return data_array
+
+ def close(self):
+ self.source.close()
+
+### Scale the audio (volume) in an AudioReader
+class ScaleReader(AudioReader):
+ def __init__(self, source, scale=1.0, bias=0):
+ """Constructor:
+ source is an AudioReader; scale is > 1 to increase volume; bias is inaudible but can be changed to remove clicks."""
+ AudioReader.__init__(self, source.filepath)
+ self.source = source
+ self.scale = scale
+ self.bias = bias
+
+ def channels(self):
+ return self.source.channels()
+
+ def sampling_rate(self):
+ return self.source.sampling_rate()
+
+ def duration(self):
+ return self.source.duration()
+
+ def current_time(self):
+ return self.source.current_time()
+
+ def seek_time(self, time):
+ """Set the read pointer to the specified time (in ms)"""
+ self.source.seek_time(time)
+
+ def raw_width(self):
+ """Return the width in bytes of raw samples"""
+ return self.source.raw_width()
+
+ def raw_read(self):
+ """Return some amount of data as a raw audio string"""
+ buf = self.source.raw_read()
+ if buf is None:
+ self.eof = True
+ return None
+
+ # Perform the scaling and biasing
+ if self.scale != 1.0:
+ buf = audioop.mul(buf, self.source.raw_width(), self.scale)
+
+ if self.bias != 0:
+ buf = audioop.bias(buf, self.source.raw_width(), self.bias)
+
+ return buf
+
+ def has_unsigned_singles(self):
+ """Is the raw data when this has a width of 1 stored in unsigned bytes (but not for higher widths)"""
+ return self.source.has_unsigned_singles()
+
+ def read(self):
+ # raw_read performs the necessary changes
+ buf = self.raw_read()
+ if not buf:
+ return None
+
+ if self.raw_width() == 1:
+ data_array = array('b')
+ elif self.raw_width() == 2:
+ data_array = array('h')
+ else:
+ data_array = array('i')
+
+ data_array.fromstring(buf)
+ return data_array
+
+ def close(self):
+ self.source.close()
+
+### Concatenate two audio files
+class AppendReader(AudioReader):
+ def __init__(self, one_path, two_path):
+ """Constructor: give two paths to be opened and concatenated."""
+ AudioReader.__init__(self, one_path)
+ self.one_source = AudioReader.open(one_path)
+ # Convert the second file to be like the first
+ self.two_source = ConvertReader(AudioReader.open(two_path), one_source.channels(), one_source.sampling_rate(), one_source.raw_width())
+ self.current_time = 0
+
+ def channels(self):
+ return self.one_source.channels()
+
+ def sampling_rate(self):
+ return self.one_source.sampling_rate()
+
+ def duration(self):
+ return self.one_source.duration() + self.two_source.duration()
+
+ def current_time(self):
+ return self.current_time
+
+ def seek_time(self, time):
+ """Set the read pointer to the specified time (in ms)"""
+ # Seek to one file or the other
+ if time < self.one_source.duration():
+ self.one_source.seek_time(time)
+ else:
+ self.two_source.seek_time(time - self.one_source.duration())
+ self.current_time = time
+
+ def raw_width(self):
+ """Return the width in bytes of raw samples"""
+ return self.one_source.raw_width()
+
+ def raw_read(self):
+ """Return some amount of data as a raw audio string"""
+ if self.current_time < self.one_source.duration():
+ # Read from the first audio source
+ buf = self.one_source.raw_read()
+ if buf is None:
+ buf = self.two_source.raw_read()
+ if buf is None:
+ self.eof = True
+ return None
+
+ self.current_time = self.one_source.duration() + self.two_source.current_time()
+ return buf
+ else:
+ self.current_time = self.one_source.current_time()
+ return buf
+ else:
+ # Read from the second audio source
+ buf = self.two_source.raw_read()
+ if buf is None:
+ self.eof = True
+ return None
+
+ self.current_time = self.one_source.duration() + self.two_source.current_time()
+ return buf
+
+ def has_unsigned_singles(self):
+ """Is the raw data when this has a width of 1 stored in unsigned bytes (but not for higher widths)"""
+ return self.one_source.has_unsigned_singles()
+
+ def read(self):
+ buf = self.raw_read()
+
+ if self.one_source.raw_width() == 1:
+ data_array = array('b')
+ elif self.one_source.raw_width() == 2:
+ data_array = array('h')
+ else:
+ data_array = array('i')
+
+ data_array.fromstring(buf)
+ return data_array
+
+ def close(self):
+ self.one_source.close()
+ self.two_source.close()