#!/usr/bin/env python
# Copyright (c) 2018 Thomas J. Garcia
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""pastream: GIL-less Portaudio Streams for Python
"""
from __future__ import print_function as _print_function, division as _division
try: import Queue as _queue
except ImportError: import queue as _queue
import math as _math
import threading as _threading
import time as _time
import sys as _sys
import sounddevice as _sd
import soundfile as _sf
from _py_pastream import ffi as _ffi, lib as _lib
from pa_ringbuffer import _RingBufferBase
# For debugging
## from timeit import default_timer as timer
## gtime = None
__version__ = '0.2.0'
# Set a default size for the audio callback ring buffer
_PA_BUFFERSIZE = 1 << 16
# Determines the blocksize for reading/writing sound files
_FILECHUNKSIZE = 4096
_MAXCHUNKSIZE = 1 << 18
# Private states that determine how a stream completed
_FINISHED = 1
_ABORTED = 2
_STOPPED = 4
_INITIALIZED = 8
# Include xrun flags in nampespace
paInputOverflow = _lib.paInputOverflow
paInputUnderflow = _lib.paInputUnderflow
paOutputOverflow = _lib.paOutputOverflow
paOutputUnderflow = _lib.paOutputUnderflow
[docs]class BufferFull(Exception):
pass
[docs]class BufferEmpty(Exception):
pass
[docs]class RingBuffer(_RingBufferBase):
_ffi = _ffi
_lib = _lib
__doc__ = _RingBufferBase.__doc__
@classmethod
def _from_pointer(cls, pointer):
class ProxyRingBuffer(cls):
def __init__(self, ptr, data=None):
self._ptr = ptr
self._data = data
return ProxyRingBuffer(pointer)
class _LinearBuffer(RingBuffer):
"""(internal) RingBuffer interface for non-power-of-2 sized buffers
This class is essentially the same as RingBuffer but lies to the underlying
pa_ringbuffer about its total size in order to allowing passing of
arbitrary sized buffers to the py_pastream callback using the same
RingBuffer interface.
The big caveat using this class is that you *cannot* wrap write/read around
the end of the buffer.
"""
def __new__(cls, elementsize, buffer):
try:
data = _ffi.from_buffer(buffer)
except TypeError:
data = buffer
size, rest = divmod(_ffi.sizeof(data), elementsize)
if rest:
raise ValueError('buffer size must be multiple of elementsize')
falsesize = 1 << int(_math.ceil( _math.log(size, 2) )) if size else 0
if falsesize == size:
# This is a power of 2 buffer so just create a regular RingBuffer
return RingBuffer(elementsize, buffer=data)
self = super(_LinearBuffer, cls).__new__(cls)
self._data = data
self._ptr = _ffi.new('PaUtilRingBuffer*')
rcode = _lib.PaUtil_InitializeRingBuffer(
self._ptr, elementsize, falsesize, self._data)
assert rcode == 0
# we manually assign the buffersize; this will keep us from going over
# the buffer bounds
self._ptr.bufferSize = size
return self
def __init__(self, elementsize, buffer):
pass
# Round up to the closest multiple of unit
def _unitceil(x, unit):
return unit * ((x + unit - 1) // unit)
# Default handler for writing input from a Stream to a SoundFile object
def _soundfilerecorder(stream, rxbuffer, inp_fh):
try:
latency = stream.latency[0]
dtype = stream.dtype[0]
except TypeError:
latency = stream.latency
dtype = stream.dtype
## global gtime
periodsize = max(int(round(latency * stream.samplerate)), stream.blocksize)
maxframes = _MAXCHUNKSIZE // rxbuffer.elementsize
# Set chunksize to a multiple of _FILECHUNKSIZE
chunksize = _unitceil(periodsize + _FILECHUNKSIZE, periodsize)
chunksize = min(len(rxbuffer), maxframes, chunksize)
sleeptime = (chunksize - rxbuffer.read_available + stream._offset) / stream.samplerate
if sleeptime > 0:
_time.sleep(sleeptime)
sleeptime = max(chunksize - periodsize // 8, periodsize) / stream.samplerate
## _sys.stderr.write('r:%d r:%d ' % (periodsize, max(chunksize - periodsize // 8, periodsize)))
while not stream.aborted:
# for thread safety, check the stream is active *before* reading
active = stream.active
frames = min(rxbuffer.read_available, maxframes)
if frames == 0:
# we've read everything and the stream is done; seeya!
if not active:
break
## stream._rmisses += 1
## _time.sleep(latency)
## continue
## print('1', timer() - gtime, frames)
frames, buffregn1, buffregn2 = rxbuffer.get_read_buffers(frames)
inp_fh.buffer_write(buffregn1, dtype=dtype)
if len(buffregn2):
inp_fh.buffer_write(buffregn2, dtype=dtype)
rxbuffer.advance_read_index(frames)
_time.sleep(sleeptime)
# Default handler for reading input from a SoundFile object and
# writing it to a Stream
def _soundfileplayer(stream, txbuffer, out_fh, loop=False):
try:
latency = stream.latency[1]
dtype = stream.dtype[1]
except TypeError:
latency = stream.latency
dtype = stream.dtype
## global gtime
periodsize = max(int(round(latency * stream.samplerate)), stream.blocksize)
maxframes = _MAXCHUNKSIZE // txbuffer.elementsize
# Set chunksize to a multiple of _FILECHUNKSIZE
chunksize = _unitceil(periodsize + _FILECHUNKSIZE, periodsize)
chunksize = min(len(txbuffer), maxframes, chunksize)
sleeptime = max(chunksize - periodsize // 8, periodsize) / stream.samplerate
readinto = out_fh.buffer_read_into
## _sys.stderr.write('w:%d w:%d ' % (periodsize, max(chunksize - periodsize // 8, periodsize)))
while not stream.finished:
frames = min(txbuffer.write_available, maxframes)
## if frames == 0:
## stream._wmisses += 1
## print('0', timer() - gtime, frames)
frames, buffregn1, buffregn2 = txbuffer.get_write_buffers(txbuffer.write_available)
readframes = readinto(buffregn1, dtype=dtype)
if len(buffregn2):
readframes += readinto(buffregn2, dtype=dtype)
if loop:
# grab a memoryview to avoid copies
buffregn1 = memoryview(buffregn1)
buffregn2 = memoryview(buffregn2)
while readframes < frames:
out_fh.seek(0)
readbytes = readframes * txbuffer.elementsize
if readbytes < len(buffregn1):
readframes += readinto(buffregn1[readbytes:], dtype=dtype)
else:
readframes += readinto(buffregn2[readbytes-len(buffregn1):], dtype=dtype)
txbuffer.advance_write_index(readframes)
if readframes < frames:
break
_time.sleep(sleeptime)
# convert a generic channel mapping into the format py_pastream expects
def _get_cmapping(mapping, channels):
if not isinstance(mapping, dict):
nonzeromapping = [m for m in mapping if m > 0]
mapping = zip(mapping, range(1, len(mapping) + 1))
if len(set(nonzeromapping)) < len(nonzeromapping):
raise ValueError("bad channel mapping; a channel was repeated")
else:
mapping = mapping.items()
cmapping = [0] * channels
for a, b in mapping:
if b <= 0:
raise ValueError("bad channel mapping; output channel must always be > 0")
if a > len(cmapping):
raise ValueError("bad channel mapping; input channel out of range")
cmapping[b-1] = a
return cmapping
# TODO?: add option to do asynchronous exception raising
[docs]class Stream(_sd._StreamBase):
"""Base stream class from which all other stream classes derive.
Note that this class inherits from :mod:`sounddevice`'s ``_StreamBase``
class.
"""
_soundfileplayer = staticmethod(_soundfileplayer)
_soundfilerecorder = staticmethod(_soundfilerecorder)
def __init__(self, kind, device=None, samplerate=None, channels=None,
dtype=None, blocksize=None, **kwargs):
# Set up the C portaudio callback
self._rxbuffer = self._txbuffer = None
self._cstream = _ffi.NULL
self._crefs = []
if kwargs.get('callback', None) is None:
# Init the C PyPaStream object
self._cstream = _ffi.new("Py_PaStream*")
_lib.init_stream(self._cstream)
# Pass our data and callback to sounddevice
kwargs['userdata'] = self._cstream
kwargs['callback'] = _ffi.addressof(_lib, 'callback')
kwargs['wrap_callback'] = None
self.__frames = self._cstream.frames
# These flags are used to tell when the callbacks have finished. We can
# use them to abort writing of the ringbuffer.
self.__statecond = _threading.Condition()
self.__streamlock = _threading.RLock()
self.__state = _INITIALIZED
self.__aborting = False
self.__exceptions = _queue.Queue()
# Set up reader/writer threads
self._rxthread = self._txthread = None
self._txthread_args = self._rxthread_args = None
# TODO: add support for C finished_callback function pointer
self.__finished_callback = kwargs.pop('finished_callback', lambda x: None)
def finished_callback():
# Check for any errors that might've occurred in the callback
msg = _ffi.string(self._cstream.errorMsg).decode('utf-8')
if len(msg):
exctype, excmsg = msg.split(':', 1) if ':' in msg else (msg, '')
exctype = getattr(_sys.modules[__name__], exctype)
self._set_exception(exctype(excmsg))
with self.__statecond:
# It's possible that the callback aborted itself so check if we
# need to update our aborted flag here
if self._cstream.last_callback == _sd._lib.paAbort \
or self.__aborting or not self.__exceptions.empty():
self.__state = _ABORTED | _FINISHED
elif self._cstream.last_callback == _sd._lib.paComplete:
self.__state = _FINISHED
else:
self.__state = _STOPPED | _FINISHED
self.__aborting = False
self.__statecond.notify_all()
self.__finished_callback(self)
super(Stream, self).__init__(kind, blocksize=blocksize,
device=device, samplerate=samplerate, channels=channels,
dtype=dtype, finished_callback=finished_callback, **kwargs)
# DEBUG for measuring polling performance
## self._rmisses = self._wmisses = 0
self._autoclose = False
if kind == 'duplex':
self._cstream.txElementSize = self.samplesize[1] * self.channels[1]
self._cstream.rxElementSize = self.samplesize[0] * self.channels[0]
self._cstream.txchannels = self.channels[1]
self._cstream.rxchannels = self.channels[0]
elif kind == 'output':
self._cstream.txchannels = self.channels
self._cstream.txElementSize = self.samplesize * self.channels
else: #if kind == 'input':
self._cstream.rxchannels = self.channels
self._cstream.rxElementSize = self.samplesize * self.channels
channels = max(self._cstream.txchannels, self._cstream.rxchannels)
self._crefs.append(_ffi.new('unsigned char[]', [0]*channels))
self._cstream._mapping = self._crefs[-1]
if self._cstream.txchannels:
self._crefs.append(_ffi.new('unsigned char[]', list(range(1, self._cstream.txchannels+1))))
self._cstream.txmapping = self._crefs[-1]
if self._cstream.rxchannels:
self._crefs.append(_ffi.new('unsigned char[]', list(range(1, self._cstream.rxchannels+1))))
self._cstream.rxmapping = self._crefs[-1]
def __stopiothreads(self):
# !This function is *not* thread safe!
currthread = _threading.current_thread()
if self._rxthread is not None and self._rxthread.is_alive() \
and self._rxthread != currthread:
self._rxthread.join()
if self._txthread is not None and self._txthread.is_alive() \
and self._txthread != currthread:
self._txthread.join()
# TODO: add ability to re-enter rwfunc without having to recreate the thread
def _readwritewrapper(self, rwfunc, *args, **kwargs):
"""\
Wrapper for the reader and writer functions which acts as a kind
of context manager.
"""
try:
rwfunc(self, *args, **kwargs)
except:
# Defer the exception and delegate to the owner thread
self._set_exception()
self.abort()
def _reraise_exceptions(self):
"""\
Raise the last deferred exception if one exists. If the caller's
thread is not the stream owner this function does nothing.
"""
currthread = _threading.current_thread()
if currthread is self._rxthread or currthread is self._txthread:
return
try:
exc = self.__exceptions.get(block=False)
except _queue.Empty:
return
# To simplify things, we only care about the first exception raised
self.__exceptions.queue.clear()
if isinstance(exc, tuple):
exctype, excval, exctb = exc
if exctype is not None:
excval = exctype(excval)
if hasattr(excval, 'with_traceback'):
raise excval.with_traceback(exctb)
else:
exec("raise excval, None, exctb")
else:
raise exc
def _set_exception(self, exc=None):
"""\
Queue an exception to be re-raised later using `_reraise_exceptions`.
"""
try:
self.__exceptions.put(exc or _sys.exc_info(), block=False)
except _queue.Full:
pass
def _allocate_buffer(self, size, kind, atleast_2d=False, bufferclass=bytearray):
isoutput = kind == 'output'
try:
channels = self.channels[isoutput]
samplesize = self.samplesize[isoutput]
dtype = self.dtype[isoutput]
except TypeError:
channels = self.channels
samplesize = self.samplesize
dtype = self.dtype
if issubclass(bufferclass, _RingBufferBase):
return bufferclass(channels * samplesize, size)
try:
import numpy
return numpy.zeros((size, channels) if atleast_2d or channels > 1
else size * channels, dtype=dtype)
except ImportError:
return bufferclass(size * channels * samplesize)
[docs] @classmethod
def from_file(cls, file, *args, **kwargs):
"""Create a stream using the charecteristics of a soundfile
Parameters
----------
file : SoundFile or str or int or file-like object
Other Parameters
----------------
*args, **kwargs
Arguments to pass to Stream constructor
Returns
-------
Stream or Stream subclass instance
Open stream
See Also
--------
:meth:`InputStream.to_file`
"""
if not isinstance(file, _sf.SoundFile):
file = _sf.SoundFile(file)
if kwargs.get('samplerate', None) is None:
kwargs['samplerate'] = file.samplerate
if kwargs.get('channels', None) is None:
kwargs['channels'] = file.channels
return cls(*args, **kwargs)
[docs] def wait(self, timeout=None):
"""Block until stream state changes to finished/aborted/stopped or until the
optional timeout occurs.
Parameters
----------
time : float, optional
Optional timeout in seconds.
Returns
-------
bool
True unless the timeout occurs.
"""
with self.__statecond:
if self.__state == 0:
self.__statecond.wait(timeout)
if self.__state:
# make sure any reader/writer threads are done before returning!
# TODO: need to think this through for corner cases...
self.__stopiothreads()
self._reraise_exceptions()
return self.__state > 0
@property
def isduplex(self):
"""Return whether this is a full duplex stream or not"""
return hasattr(self.channels, '__len__')
@property
def aborted(self):
"""Check whether stream has been aborted.
If True, it is guaranteed that the stream is in a finished state.
"""
return self.__state & _ABORTED > 0
@property
def finished(self):
"""Check whether the stream is in a finished state.
Will only be True if :meth:`start` has been called and the stream
either completed sucessfully or was stopped/aborted.
"""
return self.__state & _FINISHED > 0
@property
def status(self):
"""\
The current PaStreamCallbackFlags status of the portaudio
stream.
"""
return self._cstream.status
@property
def xruns(self):
"""Running total of xruns.
Each new starting of the stream resets this number to zero.
"""
return self._cstream.xruns
@property
def frame_count(self):
"""Running total of frames that have been processed.
Each new starting of the stream resets this number to zero.
"""
return self._cstream.frame_count
@property
def _offset(self):
return self._cstream.offset
@_offset.setter
def _offset(self, value):
self._cstream.offset = value
@property
def _pad(self):
return self._cstream.pad
@_pad.setter
def _pad(self, value):
# Note that the py_pastream callback doesn't act on 'pad' unless
# frames < 0; thus, set 'frames' first to get deterministic behavior.
frames = self.__frames
if frames >= 0 and value >= 0:
self._cstream.frames = value + frames
# reset autoframes whenever we set frames from here
self._cstream._autoframes = 0
self._cstream.pad = value
@property
def _frames(self):
# We fib a bit here: __frames is _cstream.frames minus any padding
return self.__frames
@_frames.setter
def _frames(self, value):
pad = self._cstream.pad
if value > 0 and pad > 0:
self._cstream.frames = value + pad
else:
self._cstream.frames = value
self.__frames = value
# reset autoframes whenever we set frames from here
self._cstream._autoframes = 0
def __enter__(self):
return self
def __exit__(self, exctype, excvalue, exctb):
self.close()
def _prepare(self):
assert self.__state, "Stream has already been started!"
# Apparently when using a PaStreamFinishedCallback the stream
# *must* be stopped before starting the stream again or the
# streamFinishedCallback will never be called
if self.__state != _INITIALIZED and not self.stopped:
super(Stream, self).stop()
# Reset stream state machine
with self.__statecond:
self.__state = 0
# Reset cstream info
_lib.reset_stream(self._cstream)
# Recreate the necessary threads
if self._rxthread_args is not None:
target, args, kwargs = self._rxthread_args
self._rxthread = _threading.Thread(target=self._readwritewrapper,
args=(target,) + args,
kwargs=kwargs)
self._rxthread.daemon = True
else:
self._rxthread = None
if self._txthread_args is not None:
target, args, kwargs = self._txthread_args
self._txthread = _threading.Thread(target=self._readwritewrapper,
args=(target,) + args,
kwargs=kwargs)
self._txthread.daemon = True
else:
self._txthread = None
# start is *not* thread safe! shouldn't have multiple callers anyway
[docs] def start(self, prebuffer=True):
"""Start the audio stream
Parameters
----------
prebuffer : bool or int, optional
Wait for a number of frames to be written to the output
buffer before starting the audio stream. If True is given
just wait for the first write. If not using threads or the
stream is not an output stream this has no effect.
"""
self._prepare()
## global gtime
## gtime = timer()
if self._txthread is not None:
self._txthread.start()
txbuffer = self._cstream.txbuffer
prebuffer = int(prebuffer)
while _lib.PaUtil_GetRingBufferReadAvailable(txbuffer) < prebuffer\
and self._txthread.is_alive():
_time.sleep(0.0025)
self._reraise_exceptions()
with self.__streamlock:
super(Stream, self).start()
if self._rxthread is not None:
self._rxthread.start()
self._reraise_exceptions()
[docs] def stop(self):
with self.__streamlock:
super(Stream, self).stop()
self.__stopiothreads()
self._reraise_exceptions()
[docs] def abort(self):
with self.__statecond:
self.__aborting = True
with self.__streamlock:
super(Stream, self).abort()
self.__stopiothreads()
self._reraise_exceptions()
[docs] def close(self):
# we take special care here to abort the stream first so that
# the pastream pointer is still valid for the lifetime of the
# read/write threads
if not self.finished:
with self.__statecond:
self.__aborting = True
with self.__streamlock:
super(Stream, self).abort()
self.__stopiothreads()
with self.__streamlock:
super(Stream, self).close()
# Drop references to any buffers and external objects
self._txthread_args = self._rxthread_args = None
self._rxbuffer = self._txbuffer = None
self._crefs = []
self._reraise_exceptions()
def __repr__(self):
try:
if self.device[0] == self.device[1]:
name = "'%s'" % _sd.query_devices(self.device)['name']
else:
name = tuple(_sd.query_devices(d)['name'] for d in self.device)
except TypeError:
name = "'%s'" % _sd.query_devices(self.device)['name']
try:
if self.channels[0] != self.channels[1]:
channels = self.channels
else:
channels = self.channels[0]
except TypeError:
channels = self.channels
if self.dtype[0] == self.dtype[1]:
# this is a hack that works only because there are no dtypes that
# start with the same two characters
dtype = self.dtype[0]
else:
dtype = self.dtype
if _sys.version_info.major < 3:
name = name.encode('utf-8', 'replace')
return ("{0.__name__}({1}, samplerate={2._samplerate:.0f}, "
"channels={3}, dtype='{4}', blocksize={2._blocksize})").format(
self.__class__, name, self, channels, dtype)
# Mix-in purely for adding playback methods
class _OutputStreamMixin(object):
@property
def txmapping(self):
"""Mutable mapping of input data channel to output channels
A mutable property specifying what input data channels are mapped to
each output channel. Note that input channels may only be mapped once
(i.e., channel duplication is not supported). Can also be specified as
a dict of input:output channel key value pairs.
See Also
--------
InputStream.rxmapping
"""
return list(self._cstream.txmapping[0:self._cstream.txchannels])
@txmapping.setter
def txmapping(self, mapping):
if mapping is None:
mapping = range(1, self._cstream.txchannels + 1)
cmapping = _get_cmapping(mapping, self._cstream.txchannels)
self._cstream.txmapping[0:len(cmapping)] = cmapping
def set_source(self, source, loop=False, buffersize=None, args=(), kwargs={}):
"""Set the playback source for the audio stream
Parameters
-----------
source : function or RingBuffer or SoundFile or buffer type
Playback source. If `source` is a function it must be of the form:
``function(stream, ringbuffer, *args, loop=<bool>,**kwargs)``.
Funcion sources are useful if you want to handle generating
playback in some custom way. For example, `source` could be a
function that reads audio data from a socket. This function will be
called from a separate thread whenever the stream is started and is
expected to close itself whenever the stream becomes inactive. For
an example see the ``_soundfileplayer`` function in the source code
for this module.
loop : bool, optional
Whether to enable playback looping.
buffersize : int, optional
RingBuffer size to use for double buffering audio data. Only
applicable if `source` is a function or SoundFile. Must be a power
of 2.
Other Parameters
-----------------
args, kwargs
Additional arguments to pass if `source` is a function.
Returns
-------
RingBuffer instance
RingBuffer wrapper interface from which audio device will read audio data.
See Also
--------
:meth:`InputStream.set_sink`
"""
try:
channels = self.channels[1]
elementsize = channels * self.samplesize[1]
except TypeError:
channels = self.channels
elementsize = channels * self.samplesize
if buffersize is None:
buffersize = _PA_BUFFERSIZE
writer = None
if isinstance(source, _sf.SoundFile):
writer = self._soundfileplayer
if source.samplerate != self.samplerate or source.channels != channels:
raise ValueError("Playback file samplerate/channels mismatch")
if loop and not source.seekable():
raise ValueError("Can't loop playback; file is not seekable")
args, kwargs = (source,), {}
elif isinstance(source, RingBuffer):
buffer = source
elif callable(source):
writer = source
else:
buffer = _LinearBuffer(elementsize, source)
buffer.advance_write_index(len(buffer))
if writer is not None:
# Only allocate a new buffer if an appropriate one is not already assigned
buffer = self._txbuffer
if buffer is None or len(buffer) != buffersize:
buffer = RingBuffer(elementsize, buffersize)
else:
buffer.flush()
# Assume the writer will take care of looping
self._cstream.loop = 0
kwargs['loop'] = loop
self._txthread_args = writer, (buffer,) + args, kwargs
else:
self._cstream.loop = loop
# null out any thread that was previously set
self._txthread_args = None
self._cstream.txbuffer = _ffi.cast('PaUtilRingBuffer*', buffer._ptr)
self._txbuffer = buffer
return buffer
def play(self, playback, frames=-1, pad=0, loop=False, buffersize=None, blocking=False):
"""Play back audio data from a buffer or file
Parameters
-----------
playback : buffer or SoundFile
Playback source.
frames : int, optional
Number of frames to play. (Note: This does *not* include the length of
any additional padding). A negative value (the default) will cause the
stream to continue until the send buffer is empty.
pad : int, optional
Number of zero frames to pad the playback with. A negative value causes
padding to be automatically chosen so that the total playback length
matches `frames` (or, if frames is negative, zero padding will be added
indefinitely).
blocking : bool, optional
Wait for playback to finish before returning.
Other Parameters
-----------------
loop, buffersize
See :meth:`OutputStream.set_source`
See Also
--------
:meth:`OutputStream.set_source`, :meth:`DuplexStream.playrec`
"""
# Null out any rx thread or rx buffer pointer but note that we
# intentionally do not clear the _rxbuffer in case that memory could be
# used again
self._rxthread_args = None
self._cstream.rxbuffer = _ffi.NULL
self.set_source(playback, loop, buffersize)
self._pad = pad
self._frames = frames
self.start()
if blocking:
self.wait()
# Mix-in purely for adding recording methods
class _InputStreamMixin(object):
@property
def rxmapping(self):
"""Mutable mapping of input data channel to output channels
A mutable property specifying what input data channels are mapped to
each output channel. Note that input channels may only be mapped once
(i.e., channel mixing is not supported). Can also be specified as a
dict of input:output channel key value pairs.
See Also
--------
OutputStream.txmapping
"""
return list(self._cstream.rxmapping[0:self._cstream.rxchannels])
@rxmapping.setter
def rxmapping(self, mapping):
if mapping is None:
mapping = range(1, self._cstream.rxchannels + 1)
cmapping = _get_cmapping(mapping, self._cstream.rxchannels)
self._cstream.rxmapping[0:len(cmapping)] = cmapping
def to_file(self, file, mode='w', **kwargs):
"""Open a SoundFile for writing based on stream properties
Parameters
----------
file : str or int or file-like object
File to open as SoundFile
**kwargs
Additional arguments to pass to SoundFile constructor
Raises
------
TypeError
If no subtype was given and an appropriate subtype could
not be guessed.
Returns
-------
SoundFile
See Also
--------
:meth:`Stream.from_file`
"""
# Try and determine the file extension here; we need to know if we
# want to try and set a default subtype for the file
fformat = kwargs.pop('format', None)
if fformat is None:
try:
fformat = getattr(file, 'name', file).rsplit('.', 1)[1].lower()
except (AttributeError, IndexError):
fformat = None
try:
channels = self.channels[0]
ssize = self.samplesize[0]
dtype = self.dtype[0]
except TypeError:
channels = self.channels
ssize = self.samplesize
dtype = self.dtype
if kwargs.get('samplerate', None) is None:
kwargs['samplerate'] = int(self.samplerate)
if kwargs.get('channels', None) is None:
kwargs['channels'] = channels
subtype = kwargs.pop('subtype', None)
endian = kwargs.get('endian', None)
if not subtype:
# For those file formats which support PCM or FLOAT, use the device
# samplesize to make a guess at a default subtype
if dtype == 'float32' and _sf.check_format(fformat, 'float', endian):
subtype = 'float'
else:
subtype = 'pcm_{0}'.format(8 * ssize)
if fformat and not _sf.check_format(fformat, subtype, endian):
raise TypeError("Could not map stream datatype '{0}' to "
"an appropriate subtype for '{1}' format; please specify"
.format(dtype, fformat))
return _sf.SoundFile(file, mode, subtype=subtype, format=fformat, **kwargs)
def set_sink(self, sink, buffersize=None, args=(), kwargs={}):
"""Set the recording sink for the audio stream
Parameters
-----------
sink : function or RingBuffer or SoundFile or buffer type
Recording sink. If `sink` is a function it must be of the form:
``function(stream, ringbuffer, *args, **kwargs)``. Funcion sources
are useful if you want to handle capturing of audio data in some
custom way. For example, `sink` could be a function that writes
audio data directly to a socket. This function will be called from
a separate thread whenever the stream is started and is expected to
close itself whenever the stream becomes inactive. For an example
see the ``_soundfilerecorder`` function in the source code for this
module.
buffersize : int, optional
RingBuffer size to use for (double) buffering audio data. Only
applicable when `sink` is either a file or function. Must be a
power of 2.
args, kwargs
Additional arguments to pass if `sink` is a function.
Returns
-------
RingBuffer instance
RingBuffer wrapper interface to which audio device will write audio data.
See Also
--------
:meth:`OutputStream.set_source`
"""
try:
channels = self.channels[0]
elementsize = channels * self.samplesize[0]
except TypeError:
channels = self.channels
elementsize = channels * self.samplesize
if buffersize is None:
buffersize = _PA_BUFFERSIZE
reader = None
if isinstance(sink, _sf.SoundFile):
reader = self._soundfilerecorder
if sink.samplerate != self.samplerate or sink.channels != channels:
raise ValueError("Recording file samplerate/channels mismatch")
args, kwargs = (sink,) + args, {}
elif isinstance(sink, RingBuffer):
buffer = sink
elif callable(sink):
reader = sink
else:
buffer = _LinearBuffer(elementsize, sink)
if reader is not None:
# Only allocate a new buffer if an appropriate one is not already assigned
buffer = self._rxbuffer
if buffer is None or len(buffer) != buffersize:
buffer = RingBuffer(elementsize, buffersize)
else:
buffer.flush()
self._rxthread_args = reader, (buffer,) + args, kwargs
else:
# null out any thread that was previously set
self._rxthread_args = None
self._cstream.rxbuffer = _ffi.cast('PaUtilRingBuffer*', buffer._ptr)
self._rxbuffer = buffer
return buffer
def record(self, frames=None, offset=0, atleast_2d=False, buffersize=None, blocking=False, out=None):
"""Record audio data to a buffer or file
Parameters
-----------
frames : int, sometimes optional
Number of frames to record. Can be omitted if `out` is specified.
offset : int, optional
Number of frames to discard from beginning of recording.
atleast_2d : bool, optional
(numpy only) Always return output as a 2 dimensional array.
buffersize : int, optional
Buffer size to use for (double) buffering audio data to file. Only
applicable when `out` is a file. Must be a power of 2.
blocking : bool, optional
Wait for recording to finish before returning.
out : buffer or SoundFile, optional
Output sink.
Returns
-------
ndarray or bytearray or type(out)
Recording destination.
See Also
--------
:meth:`InputStream.set_sink`, :meth:`DuplexStream.playrec`
"""
try:
import numpy
except ImportError:
numpy = None
if frames is None and out is None:
raise TypeError("at least one of {frames, out} is required")
if out is None:
out = self._allocate_buffer(frames - offset, 'input', atleast_2d)
if atleast_2d and (numpy is None or not isinstance(out, numpy.ndarray)):
raise ValueError("atleast_2d is only supported with numpy arrays")
# Null out any previously set tx buffer/threads (see comment in play())
self._txthread_args = None
self._cstream.txbuffer = _ffi.NULL
capture = self.set_sink(out, buffersize)
isbuffer = not isinstance(out, _sf.SoundFile)
if frames is not None:
self._frames = frames
elif isbuffer:
self._frames = len(capture) + offset
else:
self._frames = -1
self._offset = offset
self.start()
if blocking:
self.wait()
return out
#TODO add ability to pad out last chunk so it's the same length as the rest
#TODO? if `out` is set, modify the chunksize to use the length of the buffer
#TODO? when out is used have pypastream write to it directly, avoiding the copy
def chunks(self, chunksize=None, overlap=0, frames=-1, pad=-1, offset=0,
atleast_2d=False, playback=None, loop=False, buffersize=None,
out=None):
"""Read audio data in iterable chunks from a Portaudio stream.
Similar in concept to PySoundFile library's
:meth:`~soundfile.SoundFile.blocks` method. Returns an iterator over
buffered audio chunks read from a Portaudio stream. By default a
direct view into the stream's ringbuffer is returned whenever
possible. Setting an `out` buffer will of course incur an extra copy.
Parameters
----------
chunksize : int, optional
Size of iterator chunks. If not specified the stream blocksize will
be used. Note that if the blocksize is zero the yielded audio
chunks may be of variable length depending on the audio backend.
overlap : int, optional
Number of frames to overlap across blocks.
frames : int, optional
Number of frames to play/record.
pad : int, optional
Playback padding. See :meth:`OutputStream.play`. Only applicable
when playback is given.
offset : int, optional
Recording offset. See :meth:`InputStream.record`.
atleast_2d : bool, optional
(numpy only) Always return chunks as 2 dimensional arrays.
playback : buffer or SoundFile, optional
Set playback audio. Only works for full duplex streams.
loop : bool, optional
Loop the playback audio.
out : :class:`~numpy.ndarray` or buffer object, optional
Alternative output buffer in which to store the result. Note that
any buffer object - with the exception of :class:`~numpy.ndarray` -
is expected to have single-byte elements as would be provided by
e.g., ``bytearray``.
Yields
------
:class:`~numpy.ndarray` or memoryview or cffi.buffer
Buffer object with `chunksize` frames. If numpy is available
defaults to :class:`~numpy.ndarray` otherwise a buffer of bytes is
yielded (which is either a :class:`cffi.buffer` object or a
``memoryview``).
See Also
--------
:meth:`chunks`
"""
try:
channels = self.channels[0]
samplesize = self.samplesize[0]
dtype = self.dtype[0]
latency = self.latency[0]
except TypeError:
channels = self.channels
samplesize = self.samplesize
dtype = self.dtype
latency = self.latency
elementsize = channels * samplesize
try:
import numpy
except ImportError:
numpy = None
if atleast_2d and (numpy is None or not isinstance(out, numpy.ndarray)):
raise ValueError("atleast_2d is only supported with numpy arrays")
periodsize = int(round(latency * self.samplerate))
varsize = False
if not chunksize:
if self.blocksize:
chunksize = self.blocksize - overlap
elif overlap:
raise ValueError(
"Using overlap requires a non-zero chunksize or stream blocksize")
else:
varsize = True
chunksize = periodsize
if overlap >= chunksize:
raise ValueError(
"Overlap must be less than chunksize or stream blocksize")
if buffersize is None:
buffersize = _PA_BUFFERSIZE
# Allocate a ringbuffer for double buffering input
rxbuffer = self._rxbuffer
if not isinstance(rxbuffer, RingBuffer) or len(rxbuffer) != buffersize:
rxbuffer = RingBuffer(elementsize, buffersize)
else:
rxbuffer.flush()
self._cstream.rxbuffer = _ffi.cast('PaUtilRingBuffer*', rxbuffer._ptr)
self._rxbuffer = rxbuffer
# Clear any previous receive thread
self._rxthread_args = None
if playback is None:
self._txthread_args = None
self._cstream.txbuffer = _ffi.NULL
elif not self.isduplex:
raise ValueError("playback not supported; this stream is input only")
elif playback is True:
pass
else:
self.set_source(playback, loop, buffersize)
ndarray = False
if out is not None:
tempbuff = out
if numpy is not None and isinstance(out, numpy.ndarray):
ndarray = True
try: bytebuff = tempbuff.data.cast('B')
except AttributeError: bytebuff = tempbuff.data
else:
bytebuff = tempbuff
elif numpy is None:
if varsize: nbytes = len(rxbuffer) * elementsize
else: nbytes = chunksize * elementsize
# Indexing into a bytearray creates a copy, so wrap it with a
# memoryview
bytebuff = tempbuff = memoryview(bytearray(nbytes))
else:
ndarray = True
if varsize: nframes = len(rxbuffer)
else: nframes = chunksize
if channels > 1: atleast_2d = True
tempbuff = numpy.zeros((nframes, channels) if atleast_2d
else nframes * channels, dtype=dtype)
try: bytebuff = tempbuff.data.cast('B')
except AttributeError: bytebuff = tempbuff.data
# fill the first overlap block with zeros
if overlap:
rxbuffer.write(bytearray(overlap * elementsize))
# DEBUG
## logf = open('chunks2.log', 'wt')
## print("delta sleep lag yield misses frames", file=logf)
## starttime = dt = rmisses = 0
wait_time = leadtime = 0
done = False
minframes = 1 if varsize else chunksize
self._offset = offset
self._frames = frames
self._pad = pad
self.start()
try:
sleeptime = latency - \
(rxbuffer.read_available - offset - overlap) / self.samplerate
if sleeptime > 0:
_time.sleep(sleeptime)
while not (self.aborted or done):
# for thread safety, check the stream is active *before* reading
active = self.active
frames = rxbuffer.read_available
lastTime = self._cstream.lastTime.currentTime
if frames < minframes:
if not wait_time:
wait_time = self.time
if not active:
done = True
if frames == 0: break
else:
## self._rmisses += 1
_time.sleep(0.0025)
continue
elif wait_time:
leadtime = lastTime - wait_time
wait_time = 0
# Debugging only
## print("{0:f} {1:f} {2:f} {3:f} {4} {5}".format(
## 1e3*(_time.time() - starttime), 1e3*sleeptime, 1e3*(self.time - lastTime),
## 1e3*dt, self._rmisses - rmisses, frames - chunksize), file=logf)
## rmisses = self._rmisses
## starttime = _time.time()
frames, buffregn1, buffregn2 = rxbuffer.get_read_buffers(
frames if varsize else chunksize)
if out is not None or len(buffregn2):
buffsz1 = len(buffregn1)
bytebuff[:buffsz1] = buffregn1
bytebuff[buffsz1:buffsz1 + len(buffregn2)] = buffregn2
rxbuffer.advance_read_index(frames - overlap)
if not ndarray:
yield bytebuff[:frames * elementsize]
else:
yield tempbuff[:frames]
else:
if atleast_2d:
yield numpy.frombuffer(buffregn1, dtype=dtype)\
.reshape(frames, channels)
elif ndarray:
yield numpy.frombuffer(buffregn1, dtype=dtype)
else:
yield buffregn1
rxbuffer.advance_read_index(frames - overlap)
# DEBUG
## print('1', timer() - gtime, rxbuffer.read_available)
## dt = _time.time() - starttime
sleeptime = (chunksize - rxbuffer.read_available) / self.samplerate \
+ self._cstream.lastTime.currentTime - self.time \
+ leadtime
if sleeptime > 0:
_time.sleep(sleeptime)
except Exception:
if _sd._initialized:
self.abort()
raise
else:
self._reraise_exceptions()
finally:
if _sd._initialized and self._autoclose:
self.close()
[docs]class OutputStream(_OutputStreamMixin, Stream):
"""Playback only stream.
Other Parameters
----------------
*args, **kwargs
Arguments to pass to :class:`Stream`.
"""
def __init__(self, *args, **kwargs):
super(OutputStream, self).__init__('output', *args, **kwargs)
[docs]class DuplexStream(InputStream, OutputStream):
"""Full duplex audio streamer.
Other Parameters
----------------
*args, **kwargs
Arguments to pass to :class:`Stream`.
See Also
--------
:class:`OutputStream`, :class:`InputStream`
"""
def __init__(self, *args, **kwargs):
Stream.__init__(self, 'duplex', *args, **kwargs)
[docs] @classmethod
def from_file(cls, playback, *args, **kwargs):
"""Open a stream using the properties of a playback Soundfile
Parameters
----------
playback : SoundFile or str or int or file-like object
Playback audio file from which to create stream.
Other Parameters
----------------
*args, **kwargs
Arguments to pass to Stream constructor
Returns
-------
Stream
Open stream
See Also
--------
:meth:`InputStream.to_file`
"""
if not isinstance(playback, _sf.SoundFile):
playback = _sf.SoundFile(playback)
if kwargs.get('samplerate', None) is None:
kwargs['samplerate'] = playback.samplerate
channels = kwargs.pop('channels', None)
try:
kwargs['channels'] = (channels[0], channels[1] or playback.channels)
except TypeError:
kwargs['channels'] = (channels, playback.channels)
return cls(*args, **kwargs)
[docs] def playrec(self, playback, frames=None, pad=0, offset=0, atleast_2d=False,
loop=False, buffersize=None, blocking=False, out=None):
"""Simultaneously record and play audio data
Parameters
-----------
playback : buffer or SoundFile
Playback source.
frames : int, sometimes optional
Number of frames to play/record. This is required whenever
`playback` is a file and `out` is not given.
buffersize : int
Buffer size to use for (double) buffering audio data to/from
file. Only applicable when one or both of {`playback`, `out`} is a
file. Must be a power of 2.
pad, offset, atleast_2d, loop, blocking, out
See description of :meth:`InputStream.record` and
:meth:`OutputStream.play`.
Returns
-------
ndarray or bytearray or type(out)
Recording destination.
See Also
--------
:meth:`OutputStream.play`, :meth:`InputStream.record`
"""
try:
import numpy
except ImportError:
numpy = None
self.set_source(playback, loop, buffersize)
# if playback is a file with no determined length we could get a
# nonsensical value which may be negative or a huge number; thus we
# can't rely on it
if frames is None and out is None and isinstance(playback, _sf.SoundFile):
raise TypeError("at least one of {frames, out} is required when playback is a file")
if out is None:
if frames is None:
frames = len(playback)
if frames < offset:
raise ValueError("frames must be >= offset")
out = self._allocate_buffer(frames - offset + (pad if pad >= 0 else 0), 'input')
if atleast_2d and (numpy is None or not isinstance(out, numpy.ndarray)):
raise ValueError("atleast_2d is only supported with numpy arrays")
capture = self.set_sink(out, buffersize)
isbuffer = not isinstance(out, _sf.SoundFile)
if frames is not None:
self._frames = frames
elif isbuffer:
self._frames = len(capture) + offset
else:
self._frames = -1
self._offset = offset
self._pad = pad
self.start()
if blocking:
self.wait()
return out
[docs]def chunks(chunksize=None, overlap=0, frames=-1, pad=0, offset=0,
atleast_2d=False, playback=None, loop=False, buffersize=None,
out=None, **kwargs):
"""Read audio data in iterable chunks from a Portaudio stream.
This is simply a convenience function that provides the same functionality
as :meth:`InputStream.chunks`.
Parameters
------------
chunksize, overlap, frames, pad, offset, atleast_2d, playback, loop,
buffersize, out
See :meth:`InputStream.chunks` for description.
Other Parameters
-----------------
**kwargs
Additional arguments to pass to Stream constructor.
Yields
-------
ndarray or bytearray or type(out)
buffer object with `chunksize` elements.
See Also
--------
:meth:`InputStream.chunks`
"""
if playback is not None:
if isinstance(playback, _sf.SoundFile):
stream = DuplexStream.from_file(playback, **kwargs)
else:
stream = DuplexStream(**kwargs)
else:
stream = InputStream(**kwargs)
stream._autoclose = True
return stream.chunks(chunksize, overlap, frames, pad, offset, atleast_2d,
playback, loop, buffersize, out)
# Used solely for the pastream app
def _FileStreamFactory(record=None, playback=None, buffersize=None, loop=False,
rxmapping=None, txmapping=None, **kwargs):
frames = kwargs.pop('frames')
pad = kwargs.pop('pad')
offset = kwargs.pop('offset')
ichannels, ochannels = _sd._split(kwargs.get('channels', None))
iformat, oformat = _sd._split(kwargs.pop('format'))
isubtype, osubtype = _sd._split(kwargs.pop('subtype'))
iendian, oendian = _sd._split(kwargs.pop('endian'))
playback_fh = None
if playback is not None:
try:
playback_fh = _sf.SoundFile(playback)
except TypeError:
playback_fh = _sf.SoundFile(playback, kwargs['samplerate'],
ochannels, osubtype, oendian,
oformat)
if record is not None and playback is not None:
kind = 'duplex'
stream = DuplexStream.from_file(playback_fh, **kwargs)
stream.rxmapping = rxmapping
stream.txmapping = txmapping
record_fh = stream.to_file(record, subtype=isubtype, endian=iendian, format=iformat)
stream.set_source(playback_fh, loop, buffersize)
stream.set_sink(record_fh, buffersize)
elif playback is not None:
kind = 'output'
record_fh = None
stream = OutputStream.from_file(playback_fh, **kwargs)
stream.txmapping = txmapping
stream.set_source(playback_fh, loop, buffersize)
elif record is not None:
kind = 'input'
playback_fh = None
stream = InputStream(**kwargs)
stream.rxmapping = rxmapping
record_fh = stream.to_file(record, subtype=isubtype, endian=iendian, format=iformat)
stream.set_sink(record_fh, buffersize)
else:
raise TypeError("At least one of {playback, record} must be non-null.")
# frames/pad/offset can all be specified in seconds (ie a multiple of
# samplerate) so set them here after the stream is opened
locs = locals()
for k in ('frames', 'pad', 'offset'):
v = locs[k]
if v is None: continue
if isinstance(v, str): # parse the format H:M:S to number of seconds
seconds = sum(x * (60.0 ** i) for i, x in enumerate(v.split(':')[::-1]))
v = int(round(seconds * stream.samplerate))
setattr(stream, '_' + k, v)
origclose = stream.close
def close():
try:
origclose()
finally:
if playback is not None: playback_fh.close()
if record is not None: record_fh.close()
stream.close = close
return stream, record_fh, playback_fh, kind
def _get_parser(parser=None):
import shlex, re
from argparse import Action, ArgumentParser, RawDescriptionHelpFormatter
# Hour:minute:second format
hms = re.compile(r'^\s*([0-9]+:)?([0-5]?[0-9]:)?[0-5]?[0-9](\.[0-9]+)?\s*$')
if parser is None:
parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
add_help=False, fromfile_prefix_chars='@',
description=__doc__)
parser.convert_arg_line_to_args = lambda arg_line: arg_line.split()
class ListStreamsAction(Action):
def __call__(*args, **kwargs):
print(_sd.query_devices())
_sys.exit(0)
def framestype(frames):
if frames.endswith('s'):
return sizetype(frames[:-1])
elif frames == '-1':
return -1
elif not hms.match(frames):
raise ValueError("Couldn't parse argument: %s" % frames)
return frames
def dvctype(dvc):
try: return int(dvc)
except ValueError: return dvc
def sizetype(x):
if x.endswith('k'): x = int(float(x[:-1]) * 1e3)
elif x.endswith('K'): x = int(float(x[:-1]) * 1024)
elif x.endswith('m'): x = int(float(x[:-1]) * 1e6)
elif x.endswith('M'): x = int(float(x[:-1]) * 1024 * 1024)
else: x = int(float(x))
return x
def possizetype(x):
x = sizetype(x)
assert x > 0, "Must be a positive value."
return x
def nonnegsizetype(x):
x = sizetype(x)
assert x >= 0, "Must be a non-negative value."
return x
def posframestype(x):
if x.startswith('-'):
raise ValueError("Must be a non-negative value.")
return framestype(x)
def nullortype(x, type=None):
if x.lower() in ('null', '', '^'):
return None
return x if type is None else type(x)
def csvtype(arg, type=None):
csvsplit = shlex.shlex(arg, posix=True)
csvsplit.whitespace = ','; csvsplit.whitespace_split = True
return tuple(map(type or str, csvsplit))
def maptype(arg):
mapping = []
for i, x in enumerate(arg.split(','), 1):
try:
a, b = x.split(':')
a, b = int(a), int(b)
assert b > 0, "bad channel mapping; output channel must always be > 0"
except ValueError:
a, b = int(x), i
mapping.append((a, b))
return [x[0] for x in sorted(mapping, key=lambda y: y[1])]
parser.add_argument("input", type=nullortype, metavar='input|NULL',
help='''\
Playback audio file. Use dash (-) to read from STDIN. Use one of {null, ^} or
an empty string ('') to select record only.''')
parser.add_argument("output", type=nullortype, metavar='output|NULL', nargs='?',
help='''\
Output file for recording. Use dash (-) to write to STDOUT. Use one of {null,
^} or an empty string ('') to select playback only.''')
genopts = parser.add_argument_group("general options")
genopts.add_argument("-h", "--help", action="help",
help="Show this help message and exit.")
genopts.add_argument("-l", "--list", action=ListStreamsAction, nargs=0,
help="List available audio device streams.")
genopts.add_argument("-q", "--quiet", action='store_true',
help="Don't print any status information.")
genopts.add_argument("--version", action='version',
version='%(prog)s ' + __version__, help="Print version and exit.")
propts = parser.add_argument_group("playback/record options",
description='''\
Note that size type arguments are accepted in the form hours:minutes:seconds by
default or in samples directly by appending an 's' suffix lead by an optional
size suffix: k[ilo] K[ibi] m[ega] M[ebi]. (e.g. 1Ks == 1024 samples).''')
propts.add_argument("--buffersize", type=possizetype, default=_PA_BUFFERSIZE,
help='''\
File buffering size in units of frames. Must be a power of 2. Determines the
maximum amount of buffering for the input/output file(s). Use higher values to
increase robustness against irregular file i/o behavior. (Default
%(default)d)''')
propts.add_argument("-d", "--duration", type=framestype, default=-1,
help='''\
Limit playback/capture to a certain duration. If duration is negative (the
default), then streaming will continue until there is no playback data
remaining or, if no playback was given, recording will continue
indefinitely.''')
propts.add_argument("--fatal-xruns", action='store_true',
help="Exit with an error if any xruns are reported.")
propts.add_argument("--loop", action='store_true',
help="Loop playback indefinitely.")
propts.add_argument("-o", "--offset", type=posframestype, default=0,
help='''\
Drop a number of frames from the start of a recording.''')
propts.add_argument("-p", "--pad", type=framestype, nargs='?', default=0,
const=-1, help='''\
Pad the input with frames of zeros. (Useful to avoid truncating full duplex
recordings). If pad is negative (the default if no argument is given) then
padding is chosen so that the total playback length matches --duration. If
duration is also negative, zero padding will be added indefinitely.''')
devopts = parser.add_argument_group("audio device options",
description='''\
Options accept single values or pairs. One of {null, ^) or an empty string
('') may be used as a placeholder for the default value.''')
devopts.add_argument("-b", "--blocksize", type=nonnegsizetype, help='''\
PortAudio buffer size in units of frames. If blocksize is zero,
backend will decide an optimal size (recommended). Default is zero.''')
devopts.add_argument("-c", "--channels", metavar='channels[,channels]',
type=lambda x: csvtype(x, lambda y: nullortype(y, int)),
help="Number of input/output channels.")
devopts.add_argument("-D", "--device", metavar='device[,device]',
type=lambda x: csvtype(x, lambda y: nullortype(y, dvctype)),
help='''\
Audio device name expression(s) or index number(s). Defaults to the
PortAudio default device(s).''')
devopts.add_argument("-f", "--format", metavar="format[,format]",
dest='dtype', type=lambda x: csvtype(x, nullortype),
help='''\
Sample format(s) of audio device stream. Must be one of {%s}.'''
% ', '.join(['null'] + list(_sd._sampleformats.keys())))
devopts.add_argument("-m", "--map", type=lambda x: nullortype(x, maptype),
default=[], nargs='+', metavar='CHMAP', help='''\
Input to output channel mapping assuming one-based indexing in the form
CHIN[:CHOUT][,CHIN[:CHOUT]...]. If the output channel number is omitted the
positional index is assumed. Note that each input/output channel may only be
mapped once (i.e., mixing channels and channel duplication are not
supported).''')
devopts.add_argument("-r", "--rate", dest='samplerate', type=possizetype,
help='''\
Sample rate in Hz. Add a 'k' suffix to specify kHz.''')
devopts.add_argument("-s", "--channel-select", metavar='channel[,channel,...]',
type=lambda x: nullortype(x, lambda y: csvtype(y, possizetype)), nargs='+',
help='''\
Channel select; open a subset of audio device's available channels. Expects a
comma-delimited list of one-based channel indices. Available with ASIO
backend only.''')
fileopts = parser.add_argument_group("audio file formatting options",
description='''\
Options accept single values or pairs. One of {null, ^} or an empty string
('') may be used as a placeholder for the default value.''')
fileopts.add_argument("-t", "--file_type", metavar="file_type[,file_type]",
type=lambda x: csvtype(x, lambda y: nullortype(y, str.upper)),
help='''\
Audio file type(s). (Required for RAW files). Typically this is determined
from the file header or extension, but it can be manually specified here. Must
be one of {%s}.''' % ', '.join(['null'] + list(_sf.available_formats().keys())))
fileopts.add_argument("-e", "--encoding", metavar="encoding[,encoding]",
type=lambda x: csvtype(x, lambda y: nullortype(y, str.upper)),
help='''\
Sample format encoding(s). Note for output file encodings: for file types that
support PCM or FLOAT format, pastream will automatically choose the sample
format that most closely matches the output device stream; for other file
types, the subtype is required. Must be one of {%s}.'''
% ', '.join(['null'] + list(_sf.available_subtypes().keys())))
fileopts.add_argument("--endian", metavar="endian[,endian]",
type=lambda x: csvtype(x, lambda y: nullortype(y, str.lower)),
help='''\
Sample endianness. Must be one of {%s}.''' % ', '.join(['null'] + ['file', 'big', 'little']))
return parser
def _main(argv=None):
import os, traceback
if argv is None:
argv = _sys.argv[1:]
parser = _get_parser()
args = parser.parse_args(argv)
# Note that input/output from the cli perspective is reversed wrt the
# pastream/portaudio library so we swap arguments here
def unpack(x):
return x[0] if x and len(x) == 1 else x and x[::-1]
settings = None
if args.channel_select:
try:
if args.channels[0] is None and args.input:
args.channels[0] = len(args.channel_select[0] or []) or None
if args.channels[1] is None and args.output and len(args.channel_select) > 1:
args.channels[1] = len(args.channel_select[1] or []) or None
except TypeError:
args.channels = [cs and len(cs) for cs in args.channel_select]
except IndexError:
if args.output and len(args.channel_select) > 1:
args.channels = (args.channels[0], len(args.channel_select[1] or []) or None)
settings = [cs and _sd.AsioSettings([x-1 for x in cs]) for cs in args.channel_select]
txmapping, rxmapping = None, None
if len(args.map) > 1:
txmapping, rxmapping = args.map
elif len(args.map):
if args.output: rxmapping = args.map[0]
else: txmapping = args.map[0]
try:
stream, record, playback, kind = _FileStreamFactory(
args.output, args.input,
buffersize=args.buffersize,
loop=args.loop,
offset=args.offset,
pad=args.pad,
frames=args.duration,
samplerate=args.samplerate,
blocksize=args.blocksize,
endian=unpack(args.endian),
subtype=unpack(args.encoding),
format=unpack(args.file_type),
device=unpack(args.device),
channels=unpack(args.channels),
dtype=unpack(args.dtype),
extra_settings=unpack(settings),
txmapping=txmapping, rxmapping=rxmapping)
except (TypeError, ValueError):
traceback.print_exc()
parser.print_usage()
parser.exit(255)
if args.output == '-' or args.quiet:
stdout = open(os.devnull, 'w')
else:
stdout = _sys.stdout
statline = "\r {:02.0f}:{:02.0f}:{:05.2f}s ({:d} xruns, {:6.2f}% load)\r"
print("<-", 'null' if playback is None else playback, file=stdout)
print("--", stream, file=stdout)
print("->", 'null' if record is None else record, file=stdout)
with stream:
try:
stream.start()
t1 = _time.time()
while stream.active:
dt = _time.time() - t1
line = statline.format(dt // 3600, dt % 3600 // 60, dt % 60,
stream.xruns,
100 * stream.cpu_load)
stdout.write(line); stdout.flush()
if args.fatal_xruns and stream.status:
# I've seen some really odd hanging behavior in older
# versions of portaudio using pulseaudio devices with
# abort(); stop() seems not to cause the same issues though
stream.stop()
break
_time.sleep(0.12)
except KeyboardInterrupt:
stream.stop()
finally:
print(file=stdout)
if args.fatal_xruns and stream.xruns:
print("ERROR: xruns detected: Aborted", file=_sys.stderr)
print("Callback info:", file=stdout)
print("\tFrames processed: %d ( %.3fs )"
% (stream.frame_count, stream.frame_count / float(stream.samplerate)),
file=stdout)
print('''\
\tinput xruns (under/over): {0.inputUnderflows}/{0.inputOverflows}
\toutput runs (under/over): {0.outputUnderflows}/{0.outputOverflows}'''
.format(stream._cstream), file=stdout)
## print(stream._rmisses, stream._wmisses, file=_sys.stderr)
return 0
if __name__ == '__main__':
_sys.exit(_main())