from __future__ import annotations
import itertools
import math
import time
from array import array
from enum import IntEnum
from mrcrowbar import encoding
from mrcrowbar.common import bounds, is_bytes
try:
import miniaudio
except ImportError:
miniaudio = None
RESAMPLE_BUFFER = 4096
NORMALIZE_BUFFER = 8192
RESAMPLE_RATE = 44100
PLAYBACK_RATE = 44100
MINIAUDIO_NORMALISE_TYPE = "FLOAT32"
MINIAUDIO_NORMALISE_SIZE = 4
[docs]class AudioInterpolation( IntEnum ):
#: Perform no audio interpolation and let PortAudio sort it out.
#: (Sounds like CUBIC)
NONE = 0
#: Perform sharp linear interpolation between samples.
#: This is the algorithm used by most early DSPs, such as the one in the original
#: Sound Blaster, and has a pleasing brightness and crispness to it.
LINEAR = 1
#: Perform sharp step interpolation between samples.
#: I'm sure if you go nasty enough, you could find a DSP that sounds like this.
#: This sounds like LINEAR, except with more distortion.
STEP = 2
mix_linear = lambda a, b, alpha: (b - a) * alpha + a
mix_step = lambda a, b, alpha: a
[docs]def normalise_audio(
source,
format_type,
field_size,
signedness,
endian,
start=None,
end=None,
length=None,
):
assert is_bytes( source )
start, end = bounds( start, end, length, len( source ) )
if format_type == float:
return array(
"f",
encoding.unpack_array(
(format_type, field_size, signedness, endian), source[start:end]
),
)
elif format_type == int:
divisor = 1 << (field_size * 8 - 1)
if signedness == "signed":
return array(
"f",
(
float( x ) / divisor
for x in encoding.unpack_array(
(format_type, field_size, signedness, endian), source[start:end]
)
),
)
else:
return array(
"f",
(
float( x - divisor ) / divisor
for x in encoding.unpack_array(
(format_type, field_size, signedness, endian), source[start:end]
)
),
)
return array( "f" )
[docs]def normalise_audio_iter(
source,
format_type,
field_size,
signedness,
endian,
start=None,
end=None,
length=None,
overlap=0,
chunk_size=NORMALIZE_BUFFER,
):
assert is_bytes( source )
start, end = bounds( start, end, length, len( source ) )
increment = chunk_size + overlap * field_size
for i in range( start, end, chunk_size ):
yield normalise_audio(
source,
format_type,
field_size,
signedness,
endian,
start=i,
end=None,
length=increment,
)
[docs]def resample_audio_iter(
source,
format_type,
field_size,
signedness,
endian,
channels,
sample_rate,
start=None,
end=None,
length=None,
interpolation=AudioInterpolation.LINEAR,
output_rate=RESAMPLE_RATE,
):
if sample_rate == 0:
yield 0.0
return
assert is_bytes( source )
start, end = bounds( start, end, length, len( source ) )
mixer = mix_linear
if interpolation == AudioInterpolation.STEP:
mixer = mix_step
old_len = (end - start) // (channels * field_size) # length in n-channel samples
new_len = old_len * output_rate // sample_rate
bytes_to_norm = field_size * channels
src_inc = NORMALIZE_BUFFER # increment in n-channel samples
src_iter = normalise_audio_iter(
source,
format_type,
field_size,
signedness,
endian,
start,
end,
overlap=channels,
chunk_size=src_inc * bytes_to_norm,
)
src = next( src_iter, None )
src_bound = src_inc # upper bound in n-channel samples
for index_base in range( 0, new_len ):
tgt_pos = index_base # position in n-channel samples
src_pos = tgt_pos * sample_rate / output_rate # position in n-channel samples
samp_index = (
math.floor( src_pos ) % src_inc
) # position in normalised array index
alpha = math.fmod( src_pos, 1.0 )
while src_bound < src_pos:
src = next( src_iter, None )
src_bound += src_inc
if src is None:
break
for c in range( channels ):
a = (
0.0
if samp_index * channels + c >= len( src )
else src[samp_index * channels + c]
)
b = (
0.0
if samp_index * channels + c + channels >= len( src )
else src[samp_index * channels + c + channels]
)
yield mixer( a, b, alpha )
[docs]def play_pcm(
source,
channels,
sample_rate,
format_type,
field_size,
signedness,
endian,
start=None,
end=None,
length=None,
interpolation=AudioInterpolation.LINEAR,
):
"""Play back a byte string as PCM audio.
source
The byte string to play.
channels
Number of audio channels.
sample_rate
Audio sample rate in Hz.
format_type
Type of sample encoding; either int or float.
field_size
Size of each sample, in bytes.
signedness
Signedness of each sample; either 'signed' or 'unsigned'.
endian
Endianness of each sample; either 'big', 'little' or None.
start
Start offset to read from (default: start).
end
End offset to stop reading at (default: end).
length
Length to read in (optional replacement for end).
interpolation
Interpolation algorithm to use for upsampling. Defaults to AudioInterpolation.LINEAR.
"""
assert is_bytes( source )
start, end = bounds( start, end, length, len( source ) )
if not miniaudio:
raise ImportError(
"miniaudio must be installed for audio playback support (see https://github.com/irmen/pyminiaudio)"
)
format = getattr( miniaudio.SampleFormat, MINIAUDIO_NORMALISE_TYPE )
playback_rate = None
INTERP_MAP = {
AudioInterpolation.NONE: miniaudio.DitherMode.NONE,
AudioInterpolation.LINEAR: miniaudio.DitherMode.TRIANGLE,
AudioInterpolation.STEP: miniaudio.DitherMode.RECTANGLE,
}
interpolation = INTERP_MAP.get( interpolation, miniaudio.DitherMode.NONE )
FORMAT_MAP = {
(int, 1, "unsigned", None): miniaudio.SampleFormat.UNSIGNED8,
(int, 2, "signed", "little"): miniaudio.SampleFormat.SIGNED16,
(int, 3, "signed", "little"): miniaudio.SampleFormat.SIGNED24,
(int, 4, "signed", "little"): miniaudio.SampleFormat.SIGNED32,
(float, 4, "signed", "little"): miniaudio.SampleFormat.FLOAT32,
}
format = FORMAT_MAP.get( (format_type, field_size, signedness, endian) )
if not format:
raise ValueError( "Format not supported yet!" )
with miniaudio.PlaybackDevice(
output_format=format, nchannels=channels, sample_rate=PLAYBACK_RATE
) as device:
def audio_iter():
conv = miniaudio.convert_frames(
format,
channels,
sample_rate,
source[start:end],
device.format,
device.nchannels,
device.sample_rate,
)
samp_iter = iter( conv )
required_frames = yield b""
old_time = time.time()
while True:
sample_data = bytes(
itertools.islice(
samp_iter, required_frames * channels * field_size
)
)
if not sample_data:
break
new_time = time.time()
old_time = new_time
required_frames = yield sample_data
ai = audio_iter()
next( ai )
device.start( ai )
while device.callback_generator:
time.sleep( 0.1 )