Python使用PyAudio制作录音工具的实现代码
作者:宿者朽命
最近有在使用屏幕录制软件录制桌面,在用的过程中突发奇想,使用python能不能做屏幕录制工具,也锻炼下自己的动手能力。
接下准备写使用python如何做屏幕录制工具的系列文章:
大概上述四个部分,希望自己能够尽快完善,上一篇文章利用opencv制作了屏幕录制部分,接下继续更新系列,使用python录制音频。
应用平台
- windows 10
- python 3.7
音频录制部分
音频录制与视频录制相似,也是以数据帧的方式录制保存,这次使用强大的第三方包PyAudio和内置的wave模块编写主要部分代码:
pip install PyAudio
如果出现安装失败,可点击去此处下载对应.whl文件,cp37代表python3.7环境,64代表64位操作系统。
假如不是下载对应的whl包会导致安装失败,下载完成后,cmd窗口下进入whl的所在目录,使用pip install PyAudio-xx.whl
即可完成安装。
音频录制主要代码:
from pyaudio import PyAudio, paInt16, paContinue, paComplete # 设置固定参数 chunk = 1024 # 每个缓冲区的帧数 format_sample = paInt16 # 采样位数 channels = 2 # 声道: 1,单声道;2,双声道 fps = 44100 # 采样频率 # 这里采用回调的方式录制音频 def callback(in_data, frame_count, time_info, status): """录制回调函数""" wf.writeframes(in_data) if xx: # 当某某条件满足时 return in_data, paContinue else: return in_data, paComplete # 实例化PyAudio p = PyAudio() stream = p.open(format=format_sample, channels=channels, rate=fps, frames_per_buffer=chunk, input=True, input_device_index=None, # 输入设备索引, None为默认设备 stream_callback=callback # 回调函数 ) # 开始流录制 stream.start_stream() # 判断流是否活跃 while stream.is_active(): time.sleep(0.1) # 0.1为灵敏度 # 录制完成,关闭流及实例 stream.stop_stream() stream.close() p.terminate()
采取流式并用回调函数录制,需要先定义保存音频文件,用wave
新建音频二进制文件:
import wave wf = wave.open('test.wav', 'wb') wf.setnchannels(channels) wf.setsampwidth(p.get_sample_size(format_sample)) wf.setframerate(fps)
为了后续代码可以很好的与之结合复用,将上面的代码包装成类
from pyaudio import PyAudio class AudioRecord(PyAudio): def __init__(self,):
源码于文末补充。
音频播放部分
播放部分代码与录制部分代码相差不大,核心部分:
wf = wave.open('test.wav', 'rb') def callback(in_data, frame_count, time_info, status): data = wf.readframes(frame_count) return data, paContinue stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True, output_device_index=output_device_index, # 输入设备索引 stream_callback=callback # 输出用回调函数 ) stream.start_stream() while stream.is_active(): time.sleep(0.1)
目前暂时测试了.wav
和.mp3
格式可以正常录制及播放,其它类型格式音频可以自行调用代码进行测试。
GUI窗口所需属性值代码部分
考虑到GUI窗口能较为人性化的输出及输入值,编写该部分代码,内容含音频时长及获取输入设备及输出设备。
# 音频时长 duration = wf.getnframes() / wf.getframerate()
# 获取系统目前已安装的输入输出设备 dev_info = self.get_device_info_by_index(i) default_rate = int(dev_info['defaultSampleRate']) if not dev_info['hostApi'] and default_rate == fps and '映射器' not in dev_info['name']: if dev_info['maxInputChannels']: print('输入设备:', dev_info['name']) elif dev_info['maxOutputChannels']: print('输出设备:', dev_info['name'])
pynput监听键盘
在这部分代码也暂时使用pynput
监听键盘来对录音做中断处理。可以调用上一篇文章中的键盘监听代码。
def hotkey(self): """热键监听""" with keyboard.Listener(on_press=self.on_press) as listener: listener.join() def on_press(self, key): try: if key.char == 't': # t键,录制结束,保存音频 self.flag = True elif key.char == 'k': # k键,录制中止,删除文件 self.flag = True self.kill = True except Exception as e: print(e)
功能与上一篇类似,不再赘述。
总结
以上就是使用PyAudio调用windows的音频设备进行录制及播放,整体学习了使用类及其继承相关知识,用法在这只是展示了冰山一角,还有更多的知识等待着我们一起去探索!
于二零二一年十二月二十日作
源码:
import wave import time from pathlib import Path from threading import Thread from pyaudio import PyAudio, paInt16, paContinue, paComplete from pynput import keyboard # pip install pynput class AudioRecord(PyAudio): def __init__(self, channels=2): super().__init__() self.chunk = 1024 # 每个缓冲区的帧数 self.format_sample = paInt16 # 采样位数 self.channels = channels # 声道: 1,单声道;2,双声道 self.fps = 44100 # 采样频率 self.input_dict = None self.output_dict = None self.stream = None self.filename = '~test.wav' self.duration = 0 # 音频时长 self.flag = False self.kill = False def __call__(self, filename): """重载文件名""" self.filename = filename def callback_input(self, in_data, frame_count, time_info, status): """录制回调函数""" self.wf.writeframes(in_data) if not self.flag: return in_data, paContinue else: return in_data, paComplete def callback_output(self, in_data, frame_count, time_info, status): """播放回调函数""" data = self.wf.readframes(frame_count) return data, paContinue def open_stream(self, name): """打开录制流""" input_device_index = self.get_device_index(name, True) if name else None return self.open(format=self.format_sample, channels=self.channels, rate=self.fps, frames_per_buffer=self.chunk, input=True, input_device_index=input_device_index, # 输入设备索引 stream_callback=self.callback_input ) def audio_record_run(self, name=None): """音频录制""" self.wf = self.save_audio_file(self.filename) self.stream = self.open_stream(name) self.stream.start_stream() while self.stream.is_active(): time.sleep(0.1) self.wf.close() if self.kill: Path(self.filename).unlink() self.duration = self.get_duration(self.wf) print(self.duration) self.terminate_run() def run(self, filename=None, name=None, record=True): """音频录制线程""" thread_1 = Thread(target=self.hotkey, daemon=True) if record: # 录制 if filename: self.filename = filename thread_2 = Thread(target=self.audio_record_run, args=(name,)) # 播放 if not filename: raise Exception('未输入音频文件名,不能播放,请输入后再试!') thread_2 = Thread(target=self.read_audio, args=(filename, name,)) thread_1.start() thread_2.start() def read_audio(self, filename, name=None): """音频播放""" output_device_index = self.get_device_index(name, False) if name else None with wave.open(filename, 'rb') as self.wf: self.duration = self.get_duration(self.wf) self.stream = self.open(format=self.get_format_from_width(self.wf.getsampwidth()), channels=self.wf.getnchannels(), rate=self.wf.getframerate(), output=True, output_device_index=output_device_index, # 输出设备索引 stream_callback=self.callback_output ) self.stream.start_stream() while self.stream.is_active(): time.sleep(0.1) @staticmethod def get_duration(wf): """获取音频时长""" return round(wf.getnframes() / wf.getframerate(), 2) def get_in_out_devices(self): """获取系统输入输出设备""" self.input_dict = {} self.output_dict = {} for i in range(self.get_device_count()): dev_info = self.get_device_info_by_index(i) default_rate = int(dev_info['defaultSampleRate']) if not dev_info['hostApi'] and default_rate == self.fps and '映射器' not in dev_info['name']: if dev_info['maxInputChannels']: self.input_dict[dev_info['name']] = i elif dev_info['maxOutputChannels']: self.output_dict[dev_info['name']] = i def get_device_index(self, name, input_in=True): """获取选定设备索引""" if input_in and self.input_dict: return self.input_dict.get(name, -1) elif not input_in and self.output_dict: return self.output_dict.get(name, -1) def save_audio_file(self, filename): """音频文件保存""" wf = wave.open(filename, 'wb') wf.setnchannels(self.channels) wf.setsampwidth(self.get_sample_size(self.format_sample)) wf.setframerate(self.fps) return wf def terminate_run(self): """结束流录制或流播放""" if self.stream: self.stream.stop_stream() self.stream.close() self.terminate() def hotkey(self): """热键监听""" with keyboard.Listener(on_press=self.on_press) as listener: listener.join() def on_press(self, key): try: if key.char == 't': # t键,录制结束,保存音频 self.flag = True elif key.char == 'k': # k键,录制中止,删除文件 self.kill = True except Exception as e: print(e) if __name__ == '__main__': audio_record = AudioRecord() audio_record.get_in_out_devices() # 录制 print(audio_record.input_dict) audio_record.run('test.mp3') # 播放 print(audio_record.output_dict) audio_record.run('test.mp3', record=False)
到此这篇关于Python使用PyAudio制作录音工具的文章就介绍到这了,更多相关Python PyAudio录音工具内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!