python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > python操作麦克风

python操作麦克风方式

作者:AI算法网奇

文章总结:作者分享了麦克风测试的个人经验,包括使用websockerserver.py和pyaudio_client.py进行麦克风读取的方法,希望对大家有所帮助,并鼓励大家支持脚本之家

查询麦克风

import sounddevice as sd

# 1. 查看所有主机API
print("=== All Host APIs ===")
for i, h in enumerate(sd.query_hostapis()):
    print(f"HostAPI {i}: {h['name']}")

print("\n=== All Devices ===")
# 2. 查看所有设备
for i, d in enumerate(sd.query_devices()):
    hostapi = sd.query_hostapis(d['hostapi'])
    device_type = []
    if d['max_input_channels'] > 0:
        device_type.append("Input")
    if d['max_output_channels'] > 0:
        device_type.append("Output")
    
    print(f"Device {i}: {d['name']}")
    print(f"  HostAPI: {hostapi['name']}")
    print(f"  Type: {', '.join(device_type)}")
    print(f"  Input channels: {d['max_input_channels']}")
    print(f"  Output channels: {d['max_output_channels']}")
    print()

# 3. 尝试使用默认设备
print("\n=== Testing Default Devices ===")
print(f"Default input device: {sd.default.device[0]}")
print(f"Default output device: {sd.default.device[1]}")

# 4. 直接使用默认输入设备
try:
    def audio_callback(indata, frames, time, status):
        if status:
            print(f"Status: {status}")
        print(f"Audio shape: {indata.shape}")
    
    
    with sd.InputStream(samplerate=16000, channels=1, dtype='int16', blocksize=320, callback=audio_callback):
        input("🎤 Recording with default device... Press Enter to stop\n")
except Exception as e:
    print(f"Error: {e}")

# 5. 或者,列出所有输入设备
print("\n=== All Input Devices ===")
input_devices = []
for i, d in enumerate(sd.query_devices()):
    if d['max_input_channels'] > 0:
        hostapi = sd.query_hostapis(d['hostapi'])
        input_devices.append((i, d, hostapi['name']))
        print(f"Device {i}: {d['name']}")
        print(f"  HostAPI: {hostapi['name']}")
        print(f"  Channels: {d['max_input_channels']}")

# 6. 选择一个可用的输入设备
if input_devices:
    print("\n=== Try using first available input device ===")
    mic_index = input_devices[0][0]
    device_info = input_devices[0][1]
    
    channels = min(1, device_info['max_input_channels'])  # 使用单声道更通用
    
    try:
        with sd.InputStream(device=mic_index, samplerate=16000, channels=channels, dtype='int16', blocksize=320, callback=lambda indata, frames, time, status: print(f"Audio shape: {indata.shape}")):
            input(f"🎤 Recording with {device_info['name']}... Press Enter to stop\n")
    except Exception as e:
        print(f"Error with device {mic_index}: {e}")
else:
    print("No input devices found at all!")

测试麦克风

import sounddevice as sd
import numpy as np

# 直接使用设备10
mic_index = 10

print("Testing microphone...")

def callback(indata, frames, time, status):
    volume = np.linalg.norm(indata) * 10
    print(f"Microphone level: {volume:.2f}", end='\r')

# 尝试不同的参数组合
settings_to_try = [
    {'samplerate': 16000, 'channels': 1, 'dtype': 'int16'},
    {'samplerate': 44100, 'channels': 1, 'dtype': 'float32'},
    {'samplerate': 48000, 'channels': 1, 'dtype': 'int16'},
]

for i, settings in enumerate(settings_to_try):
    print(f"\nTry {i+1}: {settings}")
    try:
        with sd.InputStream(
            device=mic_index,
            callback=callback,
            **settings
        ):
            input(f"Settings {i+1} working! Press Enter to stop...\n")
            break
    except Exception as e:
        print(f"Failed: {e}")

读取麦克风

import sounddevice as sd
import numpy as np
from scipy import signal

mic_index = 10
target_samplerate = 16000  # 目标采样率
original_samplerate = 44100  # 设备支持的采样率

print(f"Recording at {original_samplerate}Hz, resampling to {target_samplerate}Hz")


def callback(indata, frames, time, status):
    """接收44100Hz音频,重采样到16000Hz"""
    if status:
        print(status)
    
    # 如果是立体声,转换为单声道
    if indata.shape[1] > 1:
        audio = np.mean(indata, axis=1)
    else:
        audio = indata.flatten()
    
    # 重采样到16000Hz
    num_samples = int(len(audio) * target_samplerate / original_samplerate)
    resampled = signal.resample(audio, num_samples)
    
    print(f"Original: {len(audio)} samples, Resampled: {len(resampled)} samples", end='\r')


with sd.InputStream(device=mic_index, samplerate=original_samplerate, channels=1, dtype='float32', callback=callback):
    input("Recording and resampling... Press Enter to stop\n")

web socker server.py

import asyncio
import websockets
import json
import numpy as np
from datetime import datetime
import time

async def handle_audio_client(websocket, path):
    """处理音频客户端连接"""
    client_id = id(websocket)
    client_ip = websocket.remote_address[0]
    
    print(f"\n✅ Client {client_id} connected from {client_ip}")
    
    try:
        # 1. 接收音频格式信息
        try:
            format_msg = await asyncio.wait_for(websocket.recv(), timeout=5.0)
            if isinstance(format_msg, str):
                format_data = json.loads(format_msg)
                print(f"📋 Audio format: {format_data}")
                
                # 发送确认
                await websocket.send(json.dumps({'status': 'ready', 'message': 'Start sending audio!'}))
        except asyncio.TimeoutError:
            print("⚠️ No format received, assuming default settings")
        
        # 2. 实时接收音频数据
        print("👂 Listening for audio data...")
        print("-" * 60)
        
        packet_count = 0
        total_bytes = 0
        start_time = time.time()
        last_print_time = time.time()
        
        try:
            while True:
                try:
                    # 接收数据(设置超时)
                    message = await asyncio.wait_for(websocket.recv(), timeout=2.0)
                    packet_count += 1
                    
                    if isinstance(message, bytes):
                        # 音频数据
                        data_size = len(message)
                        total_bytes += data_size
                        
                        # 解析音频数据
                        try:
                            audio_data = np.frombuffer(message, dtype=np.float32)
                            
                            # 计算实时统计
                            if len(audio_data) > 0:
                                # 计算音量
                                rms = np.sqrt(np.mean(audio_data ** 2))
                                max_val = np.max(np.abs(audio_data))
                                
                                # 转换为分贝
                                if rms > 0:
                                    db = 20 * np.log10(rms)
                                else:
                                    db = -100
                                
                                # 创建音量可视化
                                bars = max(0, min(int((db + 60) / 3), 20))
                                volume_bar = "█" * bars + "░" * (20 - bars)
                                
                                # 每秒更新显示
                                current_time = time.time()
                                if current_time - last_print_time >= 0.1:  # 每0.1秒更新一次
                                    elapsed = current_time - start_time
                                    data_rate = total_bytes / elapsed / 1024  # KB/s
                                    
                                    print(f"\r🎤 Packets: {packet_count:4d} | "
                                          f"Rate: {data_rate:5.1f} KB/s | "
                                          f"RMS: {rms:6.4f} | "
                                          f"dB: {db:6.1f} | "
                                          f"Volume: [{volume_bar}]", end="", flush=True)
                                    
                                    last_print_time = current_time
                        
                        except Exception as e:
                            print(f"\n⚠️ Audio processing error: {e}")
                    
                    elif isinstance(message, str):
                        print(f"\n📨 Message: {message}")
                
                except asyncio.TimeoutError:
                    # 超时,检查连接是否还活着
                    try:
                        await websocket.ping()
                        continue
                    except:
                        break
        
        except websockets.exceptions.ConnectionClosed:
            print(f"\n🔌 Client disconnected normally")
    
    except Exception as e:
        print(f"\n❌ Error: {e}")
    
    finally:
        # 连接结束,显示统计
        elapsed = time.time() - start_time
        if elapsed > 0:
            print(f"\n" + "=" * 60)
            print(f"📊 Connection Statistics:")
            print(f"   Client ID: {client_id}")
            print(f"   Duration: {elapsed:.1f} seconds")
            print(f"   Packets received: {packet_count}")
            print(f"   Total data: {total_bytes / 1024:.1f} KB")
            print(f"   Average rate: {total_bytes / elapsed / 1024:.1f} KB/s")
            print(f"   Packets/sec: {packet_count / elapsed:.1f}")
            print("=" * 60)


async def main():
    server = await websockets.serve(handle_audio_client, "0.0.0.0", 8765, ping_interval=10, ping_timeout=20, max_size=10 * 1024 * 1024)
    
    print(f"✅ Server running on ws://0.0.0.0:8765")
    print(f"📡 Ready to receive audio streams")
    print(f"💡 Press Ctrl+C to stop\n")
    
    await server.wait_closed()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\n👋 Server stopped")

audio_client.py

import asyncio
import websockets
import sounddevice as sd
import numpy as np
import json
import threading
import queue
import time

print("🚀 WebSocket Audio Streaming Client")
print("=" * 60)


class WorkingClient:
    def __init__(self):
        self.server_url = "ws://localhost:8765"
        self.device_index = 1
        self.sample_rate = 44100
        self.channels = 1
        self.dtype = 'float32'
        self.blocksize = 1024
        
        # 音频队列
        self.audio_queue = queue.Queue(maxsize=50)
        
        # 控制标志
        self.running = True
        self.is_recording = False
    
    def start_audio_capture(self):
        """启动音频采集 - 在单独的线程中运行"""
        print(f"🎤 Opening microphone (device {self.device_index})...")
        
        def callback(indata, frames, time_info, status):
            if status:
                print(f"⚠️ Audio status: {status}")
            
            # 检查音频是否有效
            if np.any(indata):
                # 计算音量
                volume = np.linalg.norm(indata)
                
                
                # 只将有声音的数据放入队列
                if volume > 0.52:  # 音量阈值
                    try:
                        # 复制数据并放入队列
                        audio_copy = indata.copy()
                        self.audio_queue.put(audio_copy, timeout=0.01)
                        
                        # 显示音量(每20个包显示一次)
                        if hasattr(callback, 'counter'):
                            callback.counter += 1
                        else:
                            callback.counter = 0
                        
                        if callback.counter % 20 == 0:
                            print(f"\r🎤 Mic level: {volume:.4f} | Queue: {self.audio_queue.qsize()}", end="")
                    
                    except queue.Full:
                        # 队列满了,清空并重新开始
                        try:
                            self.audio_queue.get_nowait()
                        except:
                            pass
        
        try:
            # 创建音频流
            self.stream = sd.InputStream(device=self.device_index, samplerate=self.sample_rate, channels=self.channels, dtype=self.dtype, blocksize=self.blocksize, callback=callback)
            
            self.stream.start()
            self.is_recording = True
            print("✅ Microphone is ready! Speak now...")
            return True
        
        except Exception as e:
            print(f"❌ Failed to open microphone: {e}")
            return False
    
    async def connect_and_stream(self):
        """连接服务器并发送音频"""
        print(f"\n🔗 Connecting to server...")
        
        try:
            # 连接WebSocket
            async with websockets.connect(self.server_url, ping_interval=10, ping_timeout=20) as websocket:
                
                print("✅ Connected to server!")
                
                # 发送音频格式
                await websocket.send(json.dumps({'type': 'audio_format', 'samplerate': self.sample_rate, 'channels': self.channels, 'dtype': self.dtype, 'blocksize': self.blocksize}))
                
                # 等待服务器响应
                response = await websocket.recv()
                print(f"📨 Server: {response}")
                
                # 开始流式传输
                print("\n" + "=" * 50)
                print("📤 Streaming audio to server...")
                print("💡 Speak into your microphone!")
                print("🛑 Press Ctrl+C to stop")
                print("=" * 50 + "\n")
                
                packet_count = 0
                last_display_time = time.time()
                
                # 主发送循环
                while self.running:
                    try:
                        # 从队列获取音频数据(非阻塞)
                        if not self.audio_queue.empty():
                            audio_data = self.audio_queue.get_nowait()
                            
                            # 发送音频数据
                            await websocket.send(audio_data.tobytes())
                            packet_count += 1
                            
                            # 每秒显示一次统计
                            current_time = time.time()
                            if current_time - last_display_time >= 1.0:
                                queue_size = self.audio_queue.qsize()
                                print(f"\r📦 Packets: {packet_count:4d} | "
                                      f"Queue: {queue_size:2d} | "
                                      f"Sample: {audio_data[0, 0]:7.4f}...", end="")
                                last_display_time = current_time
                        
                        else:
                            # 队列为空,短暂等待
                            await asyncio.sleep(0.01)
                            
                            # 偶尔发送ping保持连接
                            if packet_count > 0 and packet_count % 100 == 0:
                                await websocket.ping()
                    
                    except queue.Empty:
                        # 队列空,短暂等待
                        await asyncio.sleep(0.01)
                    
                    except Exception as e:
                        print(f"\n⚠️ Streaming error: {e}")
                        break
                
                print(f"\n🛑 Stopped. Total packets sent: {packet_count}")
        
        except Exception as e:
            print(f"❌ Connection error: {e}")
    
    async def run(self):
        """运行客户端"""
        try:
            # 启动音频采集
            if not self.start_audio_capture():
                return
            
            # 连接并流式传输
            await self.connect_and_stream()
        
        except KeyboardInterrupt:
            print("\n👋 Stopped by user")
        except Exception as e:
            print(f"\n❌ Error: {e}")
        finally:
            # 清理
            self.running = False
            if hasattr(self, 'stream'):
                self.stream.stop()
                self.stream.close()
                print("🔇 Microphone closed")


# 运行客户端
async def main():
    client = WorkingClient()
    await client.run()


if __name__ == "__main__":
    asyncio.run(main())

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文