Source code for pa_ringbuffer

"""Python wrapper for PortAudio's ring buffer.

https://github.com/spatialaudio/python-pa-ringbuffer

"""
__version__ = '0.1.4'


def cdef():
    """Return C declarations needed by CFFI."""
    import platform

    if platform.system() == 'Darwin':
        ring_buffer_size_t = 'int32_t'
    else:
        ring_buffer_size_t = 'long'
    return """
/* Declarations from PortAudio's pa_ringbuffer.h: */
typedef %(ring_buffer_size_t)s ring_buffer_size_t;
typedef struct PaUtilRingBuffer
{
    ring_buffer_size_t  bufferSize;
    volatile ring_buffer_size_t writeIndex;
    volatile ring_buffer_size_t readIndex;
    ring_buffer_size_t bigMask;
    ring_buffer_size_t smallMask;
    ring_buffer_size_t elementSizeBytes;
    char* buffer;
} PaUtilRingBuffer;
ring_buffer_size_t PaUtil_InitializeRingBuffer(PaUtilRingBuffer* rbuf, ring_buffer_size_t elementSizeBytes, ring_buffer_size_t elementCount, void* dataPtr);
void PaUtil_FlushRingBuffer(PaUtilRingBuffer* rbuf);
ring_buffer_size_t PaUtil_GetRingBufferWriteAvailable(const PaUtilRingBuffer* rbuf);
ring_buffer_size_t PaUtil_GetRingBufferReadAvailable(const PaUtilRingBuffer* rbuf);
ring_buffer_size_t PaUtil_WriteRingBuffer(PaUtilRingBuffer* rbuf, const void* data, ring_buffer_size_t elementCount);
ring_buffer_size_t PaUtil_ReadRingBuffer(PaUtilRingBuffer* rbuf, void* data, ring_buffer_size_t elementCount);
ring_buffer_size_t PaUtil_GetRingBufferWriteRegions(PaUtilRingBuffer* rbuf, ring_buffer_size_t elementCount, void** dataPtr1, ring_buffer_size_t* sizePtr1, void** dataPtr2, ring_buffer_size_t* sizePtr2);
ring_buffer_size_t PaUtil_AdvanceRingBufferWriteIndex(PaUtilRingBuffer* rbuf, ring_buffer_size_t elementCount);
ring_buffer_size_t PaUtil_GetRingBufferReadRegions(PaUtilRingBuffer* rbuf, ring_buffer_size_t elementCount, void** dataPtr1, ring_buffer_size_t* sizePtr1, void** dataPtr2, ring_buffer_size_t* sizePtr2);
ring_buffer_size_t PaUtil_AdvanceRingBufferReadIndex(PaUtilRingBuffer* rbuf, ring_buffer_size_t elementCount);
/* End of pa_ringbuffer.h declarations. */
""" % locals()


def init(ffi, lib):
    """Return RingBuffer class using the given CFFI instance."""

    class RingBuffer(_RingBufferBase):
        __doc__ = _RingBufferBase.__doc__
        _ffi = ffi
        _lib = lib

    return RingBuffer


class _RingBufferBase(object):
    """PortAudio's single-reader single-writer lock-free ring buffer.

    C API documentation:
        http://portaudio.com/docs/v19-doxydocs-dev/pa__ringbuffer_8h.html
    Python wrapper:
        https://github.com/spatialaudio/python-pa-ringbuffer

    Instances of this class can be used to transport data between Python
    code and some compiled code running on a different thread.

    This only works when there is a single reader and a single writer
    (i.e. one thread or callback writes to the ring buffer, another
    thread or callback reads from it).

    This ring buffer is *not* appropriate for passing data from one
    Python thread to another Python thread.  For this, the `queue.Queue`
    class from the standard library can be used.

    :param elementsize: The size of a single data element in bytes.
    :type elementsize: int
    :param size: The number of elements in the buffer (must be a power
        of 2). Can be omitted if a pre-allocated buffer is passed.
    :type size: int
    :param buffer: optional pre-allocated buffer to use with RingBuffer.
        Note that if you pass a read-only buffer object, you still get a
        writable RingBuffer; it is your responsibility not to write
        there if the original buffer doesn't expect you to.
    :type buffer: buffer

    """

    def __init__(self, elementsize, size=None, buffer=None):
        self._ptr = self._ffi.new('PaUtilRingBuffer*')
        if buffer is None:
            if size is None:
                raise TypeError(
                    "size is required when buffer parameter is not specified")
            self._data = self._ffi.new('unsigned char[]', size * elementsize)
        elif size is not None:
            raise TypeError('exactly one of {size, buffer} is required')
        else:
            try:
                data = self._ffi.from_buffer(buffer)
            except TypeError:
                data = buffer
            size, rest = divmod(self._ffi.sizeof(data), elementsize)
            if rest:
                raise ValueError('buffer size must be multiple of elementsize')
            self._data = data

        res = self._lib.PaUtil_InitializeRingBuffer(
            self._ptr, elementsize, size, self._data)
        if res != 0:
            assert res == -1
            raise ValueError('size must be a power of 2')

    def flush(self):
        """Reset buffer to empty.

        Should only be called when buffer is **not** being read or
        written.

        """
        self._lib.PaUtil_FlushRingBuffer(self._ptr)

    @property
    def write_available(self):
        """Number of elements available in the ring buffer for writing."""
        return self._lib.PaUtil_GetRingBufferWriteAvailable(self._ptr)

    @property
    def read_available(self):
        """Number of elements available in the ring buffer for reading."""
        return self._lib.PaUtil_GetRingBufferReadAvailable(self._ptr)

    def write(self, data, size=-1):
        """Write data to the ring buffer.

        This advances the write index after writing;
        calling :meth:`advance_write_index` is *not* necessary.

        :param data: Data to write to the buffer.
        :type data: CData pointer or buffer or bytes
        :param size: The number of elements to be written.
        :type size: int, optional
        :returns: The number of elements written.

        """
        try:
            data = self._ffi.from_buffer(data)
        except TypeError:
            pass  # input is not a buffer
        if size < 0:
            size, rest = divmod(self._ffi.sizeof(data), self.elementsize)
            if rest:
                raise ValueError('data size must be multiple of elementsize')
        return self._lib.PaUtil_WriteRingBuffer(self._ptr, data, size)

    def read(self, size=-1):
        """Read data from the ring buffer into a new buffer.

        This advances the read index after reading;
        calling :meth:`advance_read_index` is *not* necessary.

        :param size: The number of elements to be read.
            If not specified, all available elements are read.
        :type size: int, optional
        :returns: A new buffer containing the read data.
            Its size may be less than the requested *size*.

        """
        if size < 0:
            size = self.read_available
        data = self._ffi.new('unsigned char[]', size * self.elementsize)
        size = self.readinto(data)
        return self._ffi.buffer(data, size * self.elementsize)

    def readinto(self, data):
        """Read data from the ring buffer into a user-provided buffer.

        This advances the read index after reading;
        calling :meth:`advance_read_index` is *not* necessary.

        :param data: The memory where the data should be stored.
        :type data: CData pointer or buffer
        :returns: The number of elements read, which may be less than
            the size of *data*.

        """
        try:
            data = self._ffi.from_buffer(data)
        except TypeError:
            pass  # input is not a buffer
        size, rest = divmod(self._ffi.sizeof(data), self.elementsize)
        if rest:
            raise ValueError('data size must be multiple of elementsize')
        return self._lib.PaUtil_ReadRingBuffer(self._ptr, data, size)

    def get_write_buffers(self, size):
        """Get buffer(s) to which we can write data.

        When done writing, use :meth:`advance_write_index` to make the
        written data available for reading.

        :param size: The number of elements desired.
        :type size: int
        :returns:
            * The room available to be written or the given *size*,
              whichever is smaller.
            * The first buffer.
            * The second buffer.
        :rtype: tuple (int, buffer, buffer)

        """
        ptr1 = self._ffi.new('void**')
        ptr2 = self._ffi.new('void**')
        size1 = self._ffi.new('ring_buffer_size_t*')
        size2 = self._ffi.new('ring_buffer_size_t*')
        return (self._lib.PaUtil_GetRingBufferWriteRegions(
                    self._ptr, size, ptr1, size1, ptr2, size2),
                self._ffi.buffer(ptr1[0], size1[0] * self.elementsize),
                self._ffi.buffer(ptr2[0], size2[0] * self.elementsize))

    def advance_write_index(self, size):
        """Advance the write index to the next location to be written.

        :param size: The number of elements to advance.
        :type size: int
        :returns: The new position.

        .. note:: This is only needed when using
            :meth:`get_write_buffers`, the method :meth:`write` takes
            care of this by itself!

        """
        return self._lib.PaUtil_AdvanceRingBufferWriteIndex(self._ptr, size)

    def get_read_buffers(self, size):
        """Get buffer(s) from which we can read data.

        When done reading, use :meth:`advance_read_index` to make the
        memory available for writing again.

        :param size: The number of elements desired.
        :type size: int
        :returns:
            * The number of elements available for reading (which might
              be less than the requested *size*).
            * The first buffer.
            * The second buffer.
        :rtype: tuple (int, buffer, buffer)

        """
        ptr1 = self._ffi.new('void**')
        ptr2 = self._ffi.new('void**')
        size1 = self._ffi.new('ring_buffer_size_t*')
        size2 = self._ffi.new('ring_buffer_size_t*')
        return (self._lib.PaUtil_GetRingBufferReadRegions(
                    self._ptr, size, ptr1, size1, ptr2, size2),
                self._ffi.buffer(ptr1[0], size1[0] * self.elementsize),
                self._ffi.buffer(ptr2[0], size2[0] * self.elementsize))

    def advance_read_index(self, size):
        """Advance the read index to the next location to be read.

        :param size: The number of elements to advance.
        :type size: int
        :returns: The new position.

        .. note:: This is only needed when using
            :meth:`get_read_buffers`, the methods :meth:`read` and
            :meth:`readinto` take care of this by themselves!

        """
        return self._lib.PaUtil_AdvanceRingBufferReadIndex(self._ptr, size)

    @property
    def elementsize(self):
        """Element size in bytes."""
        return self._ptr.elementSizeBytes

    def __len__(self):
        """Size of buffer in elements"""
        return self._ptr.bufferSize