Files
petshy/src/audio_input.py
2025-10-08 20:39:09 +08:00

168 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
音频输入模块 - 支持本地音频文件分析和实时麦克风输入
"""
import os
import numpy as np
import librosa
import soundfile as sf
from typing import Tuple, Optional, List, Dict, Any
try:
import pyaudio
PYAUDIO_AVAILABLE = True
except ImportError:
PYAUDIO_AVAILABLE = False
print("警告: PyAudio未安装实时麦克风输入功能将不可用")
class AudioInput:
"""音频输入类,提供本地文件和麦克风输入功能"""
def __init__(self, sample_rate: int = 16000, chunk_size: int = 1024):
"""
初始化音频输入类
参数:
sample_rate: 采样率默认16000HzYAMNet要求
chunk_size: 音频块大小默认1024
"""
self.sample_rate = sample_rate
self.chunk_size = chunk_size
self.stream = None
self.pyaudio_instance = None
self.buffer = []
self.is_recording = False
def load_from_file(self, file_path: str) -> Tuple[np.ndarray, int]:
"""
加载音频文件并转换为16kHz单声道格式
参数:
file_path: 音频文件路径
返回:
audio_data: 音频数据,范围[-1.0, 1.0]的numpy数组
sample_rate: 采样率
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"音频文件不存在: {file_path}")
# 使用librosa加载音频文件
audio_data, original_sr = librosa.load(file_path, sr=None, mono=True)
# 如果采样率不是16kHz进行重采样
if original_sr != self.sample_rate:
audio_data = librosa.resample(audio_data, orig_sr=original_sr, target_sr=self.sample_rate)
# 确保音频数据在[-1.0, 1.0]范围内
if np.max(np.abs(audio_data)) > 1.0:
audio_data = audio_data / np.max(np.abs(audio_data))
return audio_data, self.sample_rate
def start_microphone_capture(self) -> bool:
"""
开始麦克风捕获
返回:
success: 是否成功启动麦克风捕获
"""
if not PYAUDIO_AVAILABLE:
print("错误: PyAudio未安装无法使用麦克风输入")
return False
if self.is_recording:
print("警告: 麦克风捕获已经在运行")
return True
try:
self.pyaudio_instance = pyaudio.PyAudio()
self.stream = self.pyaudio_instance.open(
format=pyaudio.paFloat32,
channels=1,
rate=self.sample_rate,
input=True,
frames_per_buffer=self.chunk_size,
stream_callback=self._audio_callback
)
self.is_recording = True
self.buffer = []
return True
except Exception as e:
print(f"启动麦克风捕获失败: {e}")
self.stop_microphone_capture()
return False
def stop_microphone_capture(self) -> None:
"""停止麦克风捕获"""
self.is_recording = False
if self.stream is not None:
self.stream.stop_stream()
self.stream.close()
self.stream = None
if self.pyaudio_instance is not None:
self.pyaudio_instance.terminate()
self.pyaudio_instance = None
def get_audio_chunk(self) -> Optional[np.ndarray]:
"""
获取一个音频数据块
返回:
chunk: 音频数据块如果没有可用数据则返回None
"""
if not self.is_recording or not self.buffer:
return None
# 获取并移除缓冲区中的第一个块
chunk = self.buffer.pop(0)
return chunk
def save_recording(self, audio_data: np.ndarray, file_path: str) -> bool:
"""
保存录音到文件
参数:
audio_data: 音频数据
file_path: 保存路径
返回:
success: 是否成功保存
"""
try:
# 确保目录存在
os.makedirs(os.path.dirname(os.path.abspath(file_path)), exist_ok=True)
# 保存音频文件
sf.write(file_path, audio_data, self.sample_rate)
return True
except Exception as e:
print(f"保存录音失败: {e}")
return False
def _audio_callback(self, in_data, frame_count, time_info, status):
"""
PyAudio回调函数
参数:
in_data: 输入音频数据
frame_count: 帧数
time_info: 时间信息
status: 状态标志
返回:
(None, flag): 回调结果
"""
if not self.is_recording:
return (None, pyaudio.paComplete)
# 将字节数据转换为numpy数组
audio_data = np.frombuffer(in_data, dtype=np.float32)
# 添加到缓冲区
self.buffer.append(audio_data)
return (None, pyaudio.paContinue)