Sideband/sbapp/pyogg/vorbis_file_stream.py
2024-06-03 01:54:58 +02:00

111 lines
3.2 KiB
Python

import ctypes
from . import vorbis
from .pyogg_error import PyOggError
class VorbisFileStream:
def __init__(self, path, buffer_size=8192):
self.exists = False
self._buffer_size = buffer_size
self.vf = vorbis.OggVorbis_File()
error = vorbis.ov_fopen(path, ctypes.byref(self.vf))
if error != 0:
raise PyOggError("file couldn't be opened or doesn't exist. Error code : {}".format(error))
info = vorbis.ov_info(ctypes.byref(self.vf), -1)
#: Number of channels in audio file.
self.channels = info.contents.channels
#: Number of samples per second (per channel). Always
# 48,000.
self.frequency = info.contents.rate
array = (ctypes.c_char*(self._buffer_size*self.channels))()
self.buffer_ = ctypes.cast(ctypes.pointer(array), ctypes.c_char_p)
self.bitstream = ctypes.c_int()
self.bitstream_pointer = ctypes.pointer(self.bitstream)
self.exists = True # TODO: is this the best place for this statement?
#: Bytes per sample
self.bytes_per_sample = 2 # TODO: Where is this defined?
def __del__(self):
if self.exists:
vorbis.ov_clear(ctypes.byref(self.vf))
self.exists = False
def clean_up(self):
vorbis.ov_clear(ctypes.byref(self.vf))
self.exists = False
def get_buffer(self):
"""get_buffer() -> bytesBuffer, bufferLength
Returns None when all data has been read from the file.
"""
if not self.exists:
return None
buffer = []
total_bytes_written = 0
while True:
new_bytes = vorbis.ov_read(ctypes.byref(self.vf), self.buffer_, self._buffer_size*self.channels - total_bytes_written, 0, 2, 1, self.bitstream_pointer)
array_ = ctypes.cast(self.buffer_, ctypes.POINTER(ctypes.c_char*(self._buffer_size*self.channels))).contents
buffer.append(array_.raw[:new_bytes])
total_bytes_written += new_bytes
if new_bytes == 0 or total_bytes_written >= self._buffer_size*self.channels:
break
out_buffer = b"".join(buffer)
if total_bytes_written == 0:
self.clean_up()
return(None)
return out_buffer
def get_buffer_as_array(self):
"""Provides the buffer as a NumPy array.
Note that the underlying data type is 16-bit signed
integers.
Does not copy the underlying data, so the returned array
should either be processed or copied before the next call
to get_buffer() or get_buffer_as_array().
"""
import numpy # type: ignore
# Read the next samples from the stream
buf = self.get_buffer()
# Check if we've come to the end of the stream
if buf is None:
return None
# Convert the bytes buffer to a NumPy array
array = numpy.frombuffer(
buf,
dtype=numpy.int16
)
# Reshape the array
return array.reshape(
(len(buf)
// self.bytes_per_sample
// self.channels,
self.channels)
)