Python基于WebSocket实现简易屏幕共享工具
作者:CrazyDragon_King
前面写了两个简单的屏幕共享工具,不过那只是为了验证通过截屏的方式是否可行,因为通常手动截屏的频率很低,而对于视频来说它的帧率要求就很高了,至少要一秒30帧率左右。所以,经过实际的截屏工具验证,我了解了几个Python截屏库的特点和限制。例如,多数截屏库都不支持很高的截屏速度,并且截屏是典型的 CPU 密集任务(我尝试使用多线程截屏,发现速度更慢了,之后有时间我也会把这一点整理成文章发出来)。
所以,我的初始的想法其实是基于 WebSocket 来实现的。现在,就让我们对先前的代码进行重构,采用 WebSocket 来传输图片数据。不过我这里没有使用到它的双向传输的特性,只是将原来 HTTP 传输的图片换成通过 WebSocket 来传输了。不过这里后续还有很多东西可以开发,如果有时间的话,也可以基于这个做一些有趣的东西。
演示
我这个笔记本的性能可能不太行,我只要打开视频帧率就降低了很多,哈哈。
截取播放B站视频

截取摄像头画面
这样甚至可以远程共享画面了,如果两个人都布置一个,就可以各自看到对话了(不过没有声音,且效率低下,可能也就只能在局域网使用),不过这样它是相当于从服务器的地方获取的数据,而我们平时使用的视频通话工具都是从客户端获取的数据。
而且页面越多,帧率越低,这里可能要优化一下或者它就是这么累赘,只能个人使用。=

说明
注意,这里的测试环境是 Windows,因为有些库依赖于 Windows 提供的特性,所以需要在 Windows 上运行它。在程序启动时,它会开启一个截屏的线程,不断去获取屏幕的截屏,如果有用户访问,它就会通过 WebSocket 连接,把获取的屏幕截图数据传送给前端,前端的逻辑就是将它绘制在 canvas 上,并添加帧率显示。
注意:这部分前端的代码是 AI 生成,对于验证小功能来说,AI 真是太完美了。而且,其实这整个部分都可以让 AI 来做,但是具体是哪些的组合还是自己选择的,毕竟每个人的偏好和技术栈不同。
代码
所有的代码都在这里了,大概60行代码,我把前端压缩成一行代码了。如果要运行代码先要安装 flask, flask_sock, pillow, dxcam 库。
import time
from flask import Flask
from flask_sock import Sock
from io import BytesIO
from PIL import Image
import dxcam
# 创建应用
app = Flask(__name__)
sockets = Sock(app)
# 整个应用只创建一个即可
camera = dxcam.create(device_idx=0, output_idx=1) # output_idx 0 是第一块屏幕,1 是第二块屏幕
camera.start(target_fps=60, video_mode=True)
JPEG_QUALITY = 80 # 默认的jpeg图片质量,越高需要的计算量越大,同时越清晰
# 直接前后端写一起了,这个只是一个演示的demo
INDEX_HTML = """
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Screen Sharing</title> <style> body { display: flex; justify-content: center; align-items: center; height: 100vh; margin: 0; background-color: #f0f0f0; } canvas { border: 1px solid black; } </style> </head> <body> <canvas id="screenCanvas" width="960" height="540"></canvas> <script> const canvas = document.getElementById('screenCanvas'); const ctx = canvas.getContext('2d'); const ws = new WebSocket('ws://'+window.location.host+'/remote_desktop'); ws.onopen = () => { console.log("WebSocket connected"); }; ws.onerror = (event) => { console.error("WebSocket error:", event); }; let lastFrameTime = performance.now(); let frameCount = 0; let fps = 0; ws.onmessage = (event) => { const image = new Image(); image.src = URL.createObjectURL(event.data); image.onload = () => { ctx.drawImage(image, 0, 0, canvas.width, canvas.height); frameCount++; const now = performance.now(); const elapsed = now - lastFrameTime; if (elapsed >= 1000) { fps = frameCount / (elapsed / 1000); frameCount = 0; lastFrameTime = now; } displayFPS(); }; }; function displayFPS() { ctx.font = '30px Arial'; ctx.fillStyle = 'red'; ctx.fillText(`FPS: ${Math.round(fps)}`, 50, 50); } </script> </body> </html>
"""
@app.route('/', methods=['GET'])
def index():
"""简单的前端"""
return INDEX_HTML
@sockets.route('/remote_desktop')
def get_desktop(ws):
"""
获取一帧图片,并发送给前端
"""
buffer = BytesIO()
fps = 0 # 计算后端生成的帧率
frame_count = 0
last_frame_time = time.perf_counter()
while True:
reset_buffer(buffer) # 每次重置buffer,方便复用
img_data = get_frame(buffer)
frame_count += 1
elapsed = time.perf_counter() - last_frame_time
if elapsed >= 1:
fps = frame_count // elapsed
last_frame_time = time.perf_counter()
frame_count = 0
print("backend real frame fps: ", fps)
ws.send(img_data)
def get_frame(buffer):
"""
获取一帧,然后处理成jpeg并返回二进制数据
"""
image_array = camera.get_latest_frame()
Image.fromarray(image_array).save(buffer, format="JPEG", quality=JPEG_QUALITY)
return buffer.getvalue()
def reset_buffer(buffer):
# 重置buffer,方便复用
buffer.truncate(0)
buffer.seek(0)
if __name__ == '__main__':
print("remote desktop server starting...")
app.run("0.0.0.0", 9000)
方法补充
除了上文的方法,小编为大家整理了其他两个实现屏幕共享的方法,希望对大家有所帮助
方法一:50 行代码实现简易屏幕共享工具
思路
它的思路很简单,程序不断的截屏然后发送给浏览器去显示,效果也还凑合吧。这里最重要的就是 multipart/x-mixed-replace; boundary=frame 这个首部的使用了,好几年前还在大学时,我买了一个树莓派+摄像头,使用一个开源的软件叫 motion,搭建了一个局域网的视频 监控。这个 motion 的网页端就是使用该首部实现的,因为我当时很好奇就去了解了它。
代码
如果要运行代码需要安装对应的依赖:
pip install flask, mss, PIL
import time
from io import BytesIO
from flask import Flask, Response
import mss
from PIL import Image, ImageDraw, ImageFont
app = Flask(__name__)
@app.route('/monitor', methods=['GET'])
def monitor():
"""获取屏幕数据并返回"""
def gen_img():
with mss.mss() as sct:
# 获取屏幕的全部区域
screen = sct.monitors[1]
buffer = BytesIO()
fps = 0 # 帧率
frame_count = 0 # 计算帧率,这是服务端生成的速率,到了客户端还会更低
start = time.time()
while True:
# 抓取屏幕并返回
screenshot = sct.grab(screen)
# 将截图数据转换为 JPEG 格式的二进制数据
img = Image.frombytes("RGB", screenshot.size, screenshot.bgra, "raw", "BGRX")
if fps > 0:
img = add_fps(img, fps)
img.save(buffer, format="JPEG")
jpeg_data = buffer.getvalue()
yield (b'--frame\r\n' +
b'Content-Type: image/jpeg\r\n\r\n' + jpeg_data + b'\r\n')
# 重置buffer,方便复用
buffer.seek(0)
buffer.truncate(0)
frame_count += 1
elapsed = time.time() - start
if elapsed >= 1:
fps = frame_count // elapsed
frame_count = 0
start = time.time()
return Response(gen_img(), mimetype="multipart/x-mixed-replace; boundary=frame")
def add_fps(img, fps):
"""在图片上添加fps"""
draw = ImageDraw.Draw(img)
draw.text((100, 100), f"fps: {fps}", (255, 0, 0), ImageFont.truetype("arial.ttf", 100))
return img
if __name__ == '__main__':
print("monitoring...")
app.run("127.0.0.1", 9000)
方法二:简易共享屏幕工具改进版
改进
1.降低分辨率
方法一那个测试的帧率低,最大的原因是电脑的分辨率有点高了,它是2.5K屏幕,所以这次换成了 1920*1080 的屏幕来测试,效果就好很多了,基本都在 50 帧左右。
2.更换截屏库
更换了一个速度更快的截屏库,这个截屏库的速度很快,比方法一那个快多了,但是反应到实际的帧率上就低了,所以还是后续其他的操作太过于耗时,但是暂时也没有想到啥优化的好方法。
3.降低图片的质量
JPEG 格式的图片是可以选择压缩质量的,适当的调低质量可以提高处理的速度。
代码
from flask import Flask, Response
from io import BytesIO
from PIL import Image, ImageDraw, ImageFont
import time
import dxcam
app = Flask(__name__)
# 整个应用只创建一个即可
camera = dxcam.create(device_idx=0, output_idx=1) # output_idx 0 是第一块屏幕,1 是第二块屏幕
camera.start(target_fps=60, video_mode=True)
@app.route('/monitor', methods=['GET'])
def monitor():
"""获取屏幕数据并返回"""
def gen_img():
# 获取屏幕的全部区域
fps = 0 # 帧率
frame_count = 0 # 计算帧率,这是服务端生成的速率,到了客户端还会更低
start_time = time.perf_counter()
buffer = BytesIO()
while True:
reset_buffer(buffer) # 每次重置buffer,方便复用
t1 = time.perf_counter()
yield get_frame(buffer, fps)
print("get frame fps: ", 1.0 / (time.perf_counter() - t1))
frame_count += 1
elapsed_time = time.perf_counter() - start_time
if elapsed_time >= 1:
fps = frame_count // elapsed_time
frame_count = 0
start_time = time.perf_counter()
print("real frame fps: ", fps)
return Response(gen_img(), mimetype="multipart/x-mixed-replace; boundary=frame")
def get_frame(buffer, fps):
"""
获取一帧,然后处理成jpeg并返回二进制数据
"""
image_array = camera.get_latest_frame()
img = Image.fromarray(image_array)
if fps > 0:
img = add_fps(img, fps)
img.save(buffer, format="JPEG", quality=20)
return (b'--frame\r\n' +
b'Content-Type: image/jpeg\r\n\r\n' + buffer.getvalue() + b'\r\n')
def reset_buffer(buffer):
# 重置buffer,方便复用
buffer.truncate(0)
buffer.seek(0)
def add_fps(img, fps):
"""在图片上添加fps"""
draw = ImageDraw.Draw(img)
draw.text((100, 100), f"fps: {fps}", (255, 0, 0), ImageFont.truetype("arial.ttf", 100))
return img
if __name__ == '__main__':
print("monitoring...")
app.run("127.0.0.1", 9000)
到此这篇关于Python基于WebSocket实现简易屏幕共享工具的文章就介绍到这了,更多相关Python WebSocket屏幕共享内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
