一文详解python如何实现流式输出
作者:账号已丢失ovo
一、创建fastapi的项目
1.创建虚拟环境(power shell)
python -m venv venv
2.运行虚拟环境(command prompt)
venv\Scripts\activate
3.虚拟环境下安装fastapi
pip install fastapi
4.虚拟环境下安装uvicorn
pip install uvicorn
5.虚拟环境同目录创建app文件夹
6.app文件夹下创建main.py
7.配置main.py
from fastapi import FastAPI app=FastAPI() if __name__ == "__main__": import uvicorn uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)
8.测试
uvicorn app.main:app --reload
9.配置swagger
mian.py 添加如下代码
from fastapi import applications from fastapi.openapi.docs import get_swagger_ui_html def swagger_monkey_patch(*args, **kwargs): return get_swagger_ui_html( *args, **kwargs, swagger_js_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.1.0/swagger-ui-bundle.min.js", swagger_css_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.1.0/swagger-ui.min.css" ) applications.get_swagger_ui_html = swagger_monkey_patch
10.测试
访问http://127.0.0.1:8000/docs
二、一个简易循环发送
1、main.py代码如下
app=FastAPI() @app.get("/",response_class=StreamingResponse) async def start(): return await send_sse() app.include_router(sse.router, prefix="/v1") if __name__ == "__main__": uvicorn.run(app, host="127.0.0.1", port=8000)
2、sse.py
async def event_generator(): for i in range(5): # 标准 SSE 格式:data: 内容\n\n yield f"data: {{\"message\": \"Data chunk {i}\"}}\n\n".encode() await asyncio.sleep(1) # 异步非阻塞延迟 # 返回体头 headers = { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive' } router = APIRouter() @router.post("/completions") async def send_sse(): return StreamingResponse(event_generator(), headers=headers)
解释:整个代码很容易理解,调用send_see函数,其实工作中复杂的点在于,你是每一步都在发送。比如现在流行的fastgpt、dify这些基于工作流构建应用,每次返回都是节点信息,将每个节点的结果通过stream返回。
三、问题
1、如何动态实现发送?
2、return后我还有其他操作怎么办?
四、解决思路
题外话:解决问题的思路,才是你的成长,一味地依靠AI,永远在原地踏步。AI只是辅助,思路才是你的价值
问题一
因为只要return之后,只能通过event_generator函数进行操作,所以要让event_generator这个函数的循环是可控,因为外部无法直接调用event_generator。所以可以采用一个队列实现。
1、为什么是队列而不是集合?
因为要采用队列的先进先出的思想,保证数据的先后顺序。
2、是否队列为空,整个循环就结束呢?
不是,因为每个节点执行会有时间差,甚至说处理的比较慢从而导致,数据还没进队列,整个循环就已经结束了。
3、循环结束的节点怎么做?
根据业务来看,因为发送事件信息是有个event鉴别数据的类型,可以通过这个确定最后一个事件是什么从而结束整个循环。如果无法确定,可以设置具体的超时时间比如10s。
下面是具体的代码实现,以3s过期时间为例子。
async def event_generator(messags:deque): timeout_seconds = 3 # 从props获取超时时间,默认3秒 last_data_time = asyncio.get_running_loop().time() start = True while start: # 标准 SSE 格式:data: 内容\n\n if messags: message = messags.popleft() yield f"data: {{\"message\": \"Data chunk {message}\"}}\n\n".encode() last_data_time = asyncio.get_running_loop().time() # 重置计时器 else: # 检查是否超时 current_time = asyncio.get_running_loop().time() if current_time - last_data_time > timeout_seconds: logging.info("消息列表已空 {} 秒,结束任务",timeout_seconds) break # 无数据时发送心跳,避免客户端断开连接 await asyncio.sleep(0.5) # 降低 CPU 使用率 # 返回体头 headers = { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive' } router = APIRouter() messags = deque() message=0 @router.post("/completions/send") async def send_sse(): return StreamingResponse(event_generator(messags), headers=headers) @router.get("/completions/addDeque") async def send_sse(): # 定义发送信息,每次对message+1操作 global message message=message+1 messags.append(message) return "ok"
效果图:
问题二
流式的想法,是每次调用把结果给到前端。那么问题是我们写这块代码是个很长的模块,在中间会进行流式输入,如果不return,所有的信息全部进了队列,最后return其实是一个一次性返回,跟流式的理念相违背。那么如何去做,这里可以采用携程去实现这个功能。
我们可以把自己的代码块逻辑丢到携程让携程去做。整个思想逻辑,是用队列的延展性实现流式的输出。所以我们只需要保证,在发送的时候把数据给到队列就行。代码如下:
async def event_generator(messages:deque): timeout_seconds = 3 # 从props获取超时时间,默认3秒 last_data_time = asyncio.get_running_loop().time() start = True while start: # 标准 SSE 格式:data: 内容\n\n if messages: message = messages.popleft() yield f"data: {{\"message\": \"Data chunk {message}\"}}\n\n".encode() last_data_time = asyncio.get_running_loop().time() # 重置计时器 else: # 检查是否超时 current_time = asyncio.get_running_loop().time() if current_time - last_data_time > timeout_seconds: logging.info("消息列表已空 {} 秒,结束任务",timeout_seconds) break # 无数据时发送心跳,避免客户端断开连接 await asyncio.sleep(0.5) # 降低 CPU 使用率 # 返回体头 headers = { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive' } router = APIRouter() async def do_something(messages): # 操作一 message1="节点1开始" messages.append(message1) #模拟每次操作耗时 await asyncio.sleep(0.5) message2="节点1answer" messages.append(message2) #模拟每次操作耗时 await asyncio.sleep(0.5) message2="节点1结束" messages.append(message2) #模拟每次操作耗时 await asyncio.sleep(0.5) @router.post("/completions/send") async def send_sse(): messages = deque() asyncio.create_task(do_something(messages)) return StreamingResponse(event_generator(messages), headers=headers)
注:messages是局部变量,这样可以保证,线程安全。
结果如下:
以上就是一文详解python如何实现流式输出的详细内容,更多关于python流式输出的资料请关注脚本之家其它相关文章!