mirror of
https://github.com/markqvist/Sideband.git
synced 2024-12-29 09:26:20 -05:00
128 lines
3.8 KiB
Python
128 lines
3.8 KiB
Python
import ctypes
|
|
|
|
from . import ogg
|
|
from . import opus
|
|
from .pyogg_error import PyOggError
|
|
|
|
class OpusFileStream:
|
|
def __init__(self, path):
|
|
"""Opens an OggOpus file as a stream.
|
|
|
|
path should be a string giving the filename of the file to
|
|
open. Unicode file names may not work correctly.
|
|
|
|
An exception will be raised if the file cannot be opened
|
|
correctly.
|
|
|
|
"""
|
|
error = ctypes.c_int()
|
|
|
|
self.of = opus.op_open_file(ogg.to_char_p(path), ctypes.pointer(error))
|
|
|
|
if error.value != 0:
|
|
self.of = None
|
|
raise PyOggError("file couldn't be opened or doesn't exist. Error code : {}".format(error.value))
|
|
|
|
#: Number of channels in audio file
|
|
self.channels = opus.op_channel_count(self.of, -1)
|
|
|
|
#: Total PCM Length
|
|
self.pcm_size = opus.op_pcm_total(self.of, -1)
|
|
|
|
#: Number of samples per second (per channel)
|
|
self.frequency = 48000
|
|
|
|
# The buffer size should be (per channel) large enough to
|
|
# hold 120ms (the largest possible Opus frame) at 48kHz.
|
|
# See https://opus-codec.org/docs/opusfile_api-0.7/group__stream__decoding.html#ga963c917749335e29bb2b698c1cb20a10
|
|
self.buffer_size = self.frequency // 1000 * 120 * self.channels
|
|
self.Buf = opus.opus_int16 * self.buffer_size
|
|
self._buf = self.Buf()
|
|
self.buffer_ptr = ctypes.cast(
|
|
ctypes.pointer(self._buf),
|
|
opus.opus_int16_p
|
|
)
|
|
|
|
#: Bytes per sample
|
|
self.bytes_per_sample = ctypes.sizeof(opus.opus_int16)
|
|
|
|
def __del__(self):
|
|
if self.of is not None:
|
|
opus.op_free(self.of)
|
|
|
|
def get_buffer(self):
|
|
"""Obtains the next frame of PCM samples.
|
|
|
|
Returns an array of signed 16-bit integers. If the file
|
|
is in stereo, the left and right channels are interleaved.
|
|
|
|
Returns None when all data has been read.
|
|
|
|
The array that is returned should be either processed or
|
|
copied before the next call to :meth:`~get_buffer` or
|
|
:meth:`~get_buffer_as_array` as the array's memory is reused for
|
|
each call.
|
|
|
|
"""
|
|
# Read the next frame
|
|
samples_read = opus.op_read(
|
|
self.of,
|
|
self.buffer_ptr,
|
|
self.buffer_size,
|
|
None
|
|
)
|
|
|
|
# Check for errors
|
|
if samples_read < 0:
|
|
raise PyOggError(
|
|
"Failed to read OpusFileStream. Error {:d}".format(samples_read)
|
|
)
|
|
|
|
# Check if we've reached the end of the stream
|
|
if samples_read == 0:
|
|
return None
|
|
|
|
# Cast the pointer to opus_int16 to an array of the
|
|
# correct size
|
|
result_ptr = ctypes.cast(
|
|
self.buffer_ptr,
|
|
ctypes.POINTER(opus.opus_int16 * (samples_read*self.channels))
|
|
)
|
|
|
|
# Convert the array to Python bytes
|
|
return bytes(result_ptr.contents)
|
|
|
|
def get_buffer_as_array(self):
|
|
"""Provides the buffer as a NumPy array.
|
|
|
|
Note that the underlying data type is 16-bit signed
|
|
integers.
|
|
|
|
Does not copy the underlying data, so the returned array
|
|
should either be processed or copied before the next call
|
|
to :meth:`~get_buffer` or :meth:`~get_buffer_as_array`.
|
|
|
|
"""
|
|
import numpy # type: ignore
|
|
|
|
# Read the next samples from the stream
|
|
buf = self.get_buffer()
|
|
|
|
# Check if we've come to the end of the stream
|
|
if buf is None:
|
|
return None
|
|
|
|
# Convert the bytes buffer to a NumPy array
|
|
array = numpy.frombuffer(
|
|
buf,
|
|
dtype=numpy.int16
|
|
)
|
|
|
|
# Reshape the array
|
|
return array.reshape(
|
|
(len(buf)
|
|
// self.bytes_per_sample
|
|
// self.channels,
|
|
self.channels)
|
|
)
|