Python+pyaudio实现音频控制示例详解
作者:小树笔记
PyAudio 是语音处理的 Python 库,提供了比较丰富的功能。本文将利用pyaudio控制指定设备,实现录制音频、采集音频流、播放音频,感兴趣的可以了解一下
简介
PyAudio是一个跨平台的音频处理工具包,使用该工具包可以在Python程序中播放和录制音频,也可以产生wav文件等
安装
pip install PyAudio
注意:使用该命令安装时可能会报错,报错内容如下:
针对该问题,我们使用whl文件进行安装,首先在网址下面找到以下文件并下载,根据自己的python版本及计算机系统下载相应文件即可。
下载完成后,切换到文件所在目录,使用如下命令安装即可
pip3 install PyAudio-0.2.11-cp38-cp38-win_amd64.whl
pyaudio控制指定设备,录制音频/采集音频流/播放音频
#!/usr/bin/env python3 #-*- coding:utf-8 -*- #------------- 音频设备操作模块 ------------------- # # 功能: 录制/获取音频流/播放音频 # 时间: 2021-09-13 # #-------------------------------------------------- import sys ,pyaudio, wave from tqdm import tqdm class UacAudioInAndOut: def __init__(self): """ 功能: 录音参数初始化 创建vad检测模块对象 参数: / 返回值: / """ self.input_format_dict = {"S8_LE":16, "S16_LE":8, "S24_LE":4, "S32_LE":2} self.framerate_list = [8000, 11025, 16000, 22050, 32000, 44100, 48000, 88200, 96000, 176400, 192000, 352800, 384000] def _inforPrintf(self, infor_content): """ 功能: 检测操作系统,使用正确编码 输出打印信息 参数: infor_content: 信息内容 返回值: / """ if sys.platform != "linux" and sys.platform != "darwin": infor_content = str(infor_content).encode("gbk","ignore").decode("gbk") print(infor_content) def GetAllDevInfor(self): """ 功能: 显示支持设备信息 参数: / 返回值: / """ PA = pyaudio.PyAudio() self._inforPrintf("----------------------< 本机支持设备 >------------------------------") for dev_index in range(PA.get_device_count()): self._inforPrintf("\n-------------------------------------------------------") for key in PA.get_device_info_by_index(dev_index): self._inforPrintf("%s:%s"%(key, str(PA.get_device_info_by_index(dev_index)[key]))) self._inforPrintf("========================================================") def GetUacDevInfor(self, devKeywordOrIndex=None): """ 功能: 获取UAC设备信息 参数: devKeywordOrIndex: 设备名称关键字或索引 返回值: dic 设备信息字典 False 设备信息获取失败 """ PA = pyaudio.PyAudio() if devKeywordOrIndex == None: self._inforPrintf("\033[0;36;31m[UacAudioInAndOut] 未设设备, 当前使用默认设备\033[0m") return PA.get_default_input_device_info() if str(devKeywordOrIndex).isdigit(): devKeywordOrIndex = int(devKeywordOrIndex) return PA.get_device_info_by_index(devKeywordOrIndex) uac_infor_list = [] for uac_index in range(PA.get_device_count()): if PA.get_device_info_by_index(uac_index).get("name").find(str(devKeywordOrIndex)) >= 0: uac_infor_list.append(PA.get_device_info_by_index(uac_index)) if len(uac_infor_list) > 1: self._inforPrintf("\033[0;36;33m[UacAudioInAndOut] UAC 设备有多个,\ 请修正关键字, 当前设备如下: %s\033[0m"%str(uac_infor_list)) return False else: return uac_infor_list.pop() def is_framerate_supported(self, setFramerate, UacAudioInHandle, load_parame_dict, input_or_output="input"): """ 功能: 判断当配置在指定设备中是否支持 参数: setFramerate: 设置采样率 UacAudioInHandle: 设备句柄 load_parame_dict: 加载字典 input_or_output: 输入/输出功能 返回值: bool True/False """ try: if input_or_output == "input": UacAudioInHandle.is_format_supported(rate=float(setFramerate), input_device=load_parame_dict['index'], input_channels=load_parame_dict['setInputChannels'], input_format=load_parame_dict['_setInputFormat']) else: UacAudioInHandle.is_format_supported(rate=float(setFramerate), output_device=load_parame_dict['index'], output_channels=load_parame_dict['maxOutputChannels'], output_format=UacAudioInHandle.get_format_from_width(load_parame_dict['setOutputFormat'])) return True except: return False def LoadUacAudioInDevice(self, maxStreamDuration=1000, setInputChannels=None, setInputFormat=None, devKeywordOrIndex=None): """ 功能: 加载音频获取设备 参数: maxStreamDuration=1000 默认一段流时长 setInputChannels: 通道数 setInputFormat: 位宽 devKeywordOrIndex: 录音设备关键字/索引 返回值: 成功: UacAudioInHandle, StreamHandle, load_parame_dict 失败: False """ try: load_parame_dict = {} uac_infor_dict = self.GetUacDevInfor(devKeywordOrIndex) if not setInputFormat: _Format = "S16_LE" self._inforPrintf("\033[0;36;33m[UacAudioInAndOut] 未设置位宽,使用默认 S16_LE \033[0m") else: _Format = setInputFormat setInputFormat = self.input_format_dict[_Format] if not setInputChannels or int(setInputChannels) > uac_infor_dict["maxInputChannels"]: setInputChannels = uac_infor_dict["maxInputChannels"] self._inforPrintf("\033[0;36;33m[UacAudioInAndOut] 输入通道未设置/超出当前设备最大值,使用默认最大通道 %s\ \033[0m"%setInputChannels) else: setInputChannels = int(setInputChannels) dev_index = uac_infor_dict["index"] load_parame_dict["index"]=dev_index load_parame_dict["setInputFormat"] = _Format load_parame_dict["_setInputFormat"] = setInputFormat load_parame_dict["setInputChannels"] = setInputChannels UacAudioInHandle = pyaudio.PyAudio() for setInputFramerate in self.framerate_list: if self.is_framerate_supported(setInputFramerate, UacAudioInHandle, load_parame_dict): load_parame_dict["setInputFramerate"] = setInputFramerate break #计算数据大小一段 CHUNK_SIZE = int(setInputFramerate * maxStreamDuration / 1000) load_parame_dict["CHUNK_SIZE"] = CHUNK_SIZE self._inforPrintf("\033[0;36;38m[UacAudioInAndOut] 加载参数: %s\033[0m"%str(load_parame_dict)) #加载设备 StreamHandle = UacAudioInHandle.open( format=load_parame_dict['_setInputFormat'], channels=load_parame_dict['setInputChannels'], rate=load_parame_dict['setInputFramerate'], input=True, input_device_index=load_parame_dict['index'], start=False, frames_per_buffer=int(CHUNK_SIZE)) #开始流获取 StreamHandle.start_stream() return UacAudioInHandle, StreamHandle, load_parame_dict except: self._inforPrintf("\033[0;36;31m[UacAudioInAndOut] Uac AudioIn 加载失败\033[0m") return False, False, False def LoadUacAudioOutDevice(self, devKeywordOrIndex): """ 功能: 加载音频输出设备 参数: / 返回值: UacAudioInHandle 或 False """ try: uac_infor_dict = self.GetUacDevInfor(devKeywordOrIndex) UacAudioInHandle = pyaudio.PyAudio() return UacAudioInHandle, uac_infor_dict except: return False def GetUacAudioInStream(self, StreamHandle, CHUNK_SIZE): """ 功能: 开始采集声卡音频 生成音频流 参数: UacAudioInHandle: 设备句柄 StreamHandle: 流句柄 返回值 chunk_data 流数据 """ return StreamHandle.read(CHUNK_SIZE, exception_on_overflow=False) #防止溢出 def UacAudioOutPlay(self, playWavFile, Repeat=None, Pdict=None, devKeywordOrIndex=None,): """ 功能: 可以循环播放指定文件 参数: playWavFile: 播放文件路径 Repeat: 循环播放次数 CustomizeAudioParam: 自定义播放参数 返回值: / """ UacAudioInHandle, uac_infor_dict = self.LoadUacAudioOutDevice(devKeywordOrIndex) self._inforPrintf(str(uac_infor_dict).encode("gbk","ignore").decode("gbk")) self._inforPrintf("\033[1;36;34m[UacAudioInAndOut] 指定设备: %s\t播放文件: %s\t循环总数: %s\ \033[0m"%(devKeywordOrIndex, playWavFile,Repeat)) try: chunk=1024 pfb = wave.open(playWavFile, 'rb') setOutputFormat = pfb.getsampwidth() setOutputChannels = pfb.getnchannels() setOutputFramerate = pfb.getframerate() uac_infor_dict['setOutputFormat'] = setOutputFormat if setOutputChannels > uac_infor_dict["maxOutputChannels"]: self._inforPrintf("\033[0;36;31m[UacAudioInAndOut] 当前通道数,在该设备上不支持, \ 设备最大通道数: %s\033[0m"%uac_infor_dict["maxOutputChannels"]) return False if not self.is_framerate_supported(setOutputFramerate, UacAudioInHandle, uac_infor_dict, "output"): self._inforPrintf("\033[0;36;31m[UacAudioInAndOut] 当前文件采样率,在该设备上不支持,\ 设备默认采样率: %s\033[0m"%uac_infor_dict["defaultSampleRate"]) return False else: uac_infor_dict["defaultSampleRate"] = setOutputFramerate stream = UacAudioInHandle.open( output_device_index=uac_infor_dict['index'], format=UacAudioInHandle.get_format_from_width(setOutputFormat), channels=setOutputChannels, rate=setOutputFramerate, output=True) if Repeat == "Dead_cycle": self._inforPrintf("\033[1;36;33m[UacAudioInAndOut] Dead cycle play !!! \033[0m") while True: if type(Pdict) == dict and Pdict["play status"] == "stop": break pfb = wave.open(playWavFile, 'rb') while True: data = pfb.readframes(chunk) if not data: break stream.write(data) else: for index in tqdm(range(int(Repeat))): if type(Pdict) == dict and Pdict["play status"] == "stop": break pfb = wave.open(playWavFile, 'rb') while True: data = pfb.readframes(chunk) if not data: break stream.write(data) stream.stop_stream() stream.close() self.CloseAudioDevice(UacAudioInHandle) return True except: stream.stop_stream() stream.close() return False def UacAudioInRecord(self, saveWavFile, recordTime, #单位秒 setInputChannels=None, setInputFormat=None, devKeywordOrIndex=None): """ 功能: 录制音频文件 参数: recordTime: 录音时长, 单位(s) setInputFramerate: 采样率 setInputChannels: 通道数 setInputFormat: 位宽 devKeywordOrIndex: 录音设备索引 返回值: / """ maxStreamDuration=1000 load_parame_dict = {} UacAudioInHandle, StreamHandle, load_parame_dict = self.LoadUacAudioInDevice( maxStreamDuration, setInputChannels, setInputFormat, devKeywordOrIndex) if not UacAudioInHandle or not StreamHandle: self._inforPrintf("\033[0;36;31m[UacAudioInAndOut] 录音失败\033[0m") return False self._inforPrintf("\033[1;36;34m[UacAudioInAndOut] 录音 -> 文件名: %s 时长: %s\ \033[0m"%(saveWavFile,recordTime)) self._inforPrintf(load_parame_dict["CHUNK_SIZE"]) data_list = [] for recordTime_index in range(int(recordTime)): data = None data = StreamHandle.read(load_parame_dict["CHUNK_SIZE"], exception_on_overflow=False) data_list.append(data) StreamHandle.stop_stream() StreamHandle.close() self.CloseAudioDevice(UacAudioInHandle) with wave.open(saveWavFile, "wb") as wavfb: wavfb.setnchannels(load_parame_dict["setInputChannels"]) wavfb.setsampwidth(UacAudioInHandle.get_sample_size(load_parame_dict["_setInputFormat"])) wavfb.setframerate(load_parame_dict["setInputFramerate"]) wavfb.writeframes(b''.join(data_list)) """ 功能: 关闭音频流设备 参数: UacAudioInHandle 返回值: bool True/False """ try: StreamHandle.stop_stream() StreamHandle.close() self.CloseAudioDevice() return True except: return False def CloseAudioDevice(self, UacAudioDeviceHandle): """ 功能: 释放 Audio 设备 参数: UacAudioDeviceHandle 返回值: bool True/False """ try: UacAudioDeviceHandle.terminate() return True except: return False if __name__=="__main__": asv = UacAudioInAndOut() asv.GetAllDevInfor() #asv.UacAudioOutPlay(sys.argv[1], int(sys.argv[2]), None, sys.argv[3]) asv.UacAudioInRecord(sys.argv[1], sys.argv[2])
以上就是Python+pyaudio实现音频控制示例详解的详细内容,更多关于Python pyaudio音频控制的资料请关注脚本之家其它相关文章!