python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > python流式输出

一文详解python如何实现流式输出

作者:账号已丢失ovo

这篇文章主要为大家详细介绍了python如何实现流式输出效果,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以跟随小编一起学习一下

一、创建fastapi的项目

1.创建虚拟环境(power shell)

python -m venv  venv

2.运行虚拟环境(command prompt)

venv\Scripts\activate

3.虚拟环境下安装fastapi

pip install fastapi

4.虚拟环境下安装uvicorn

pip install uvicorn

5.虚拟环境同目录创建app文件夹

6.app文件夹下创建main.py

7.配置main.py

from fastapi import FastAPI
app=FastAPI()
 if __name__ == "__main__":
  import uvicorn
  uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)

8.测试

uvicorn app.main:app --reload 

9.配置swagger

mian.py 添加如下代码

from fastapi import applications
from fastapi.openapi.docs import get_swagger_ui_html
def swagger_monkey_patch(*args, **kwargs):
    return get_swagger_ui_html(
        *args, **kwargs,
        swagger_js_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.1.0/swagger-ui-bundle.min.js",
        swagger_css_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.1.0/swagger-ui.min.css"
    )
applications.get_swagger_ui_html = swagger_monkey_patch  

10.测试

访问http://127.0.0.1:8000/docs 

二、一个简易循环发送

1、main.py代码如下

app=FastAPI()

@app.get("/",response_class=StreamingResponse)
async def start():
    return await send_sse()


app.include_router(sse.router, prefix="/v1")

if __name__ == "__main__":

    uvicorn.run(app, host="127.0.0.1", port=8000)

2、sse.py

async def event_generator():
    for i in range(5):
        # 标准 SSE 格式:data: 内容\n\n
        yield f"data: {{\"message\": \"Data chunk {i}\"}}\n\n".encode()
        await asyncio.sleep(1)  # 异步非阻塞延迟
# 返回体头
headers = {
                'Content-Type': 'text/event-stream',
                'Cache-Control': 'no-cache',
                'Connection': 'keep-alive'
            }
router = APIRouter()
@router.post("/completions")
async def send_sse():
    return StreamingResponse(event_generator(), headers=headers)

解释:整个代码很容易理解,调用send_see函数,其实工作中复杂的点在于,你是每一步都在发送。比如现在流行的fastgpt、dify这些基于工作流构建应用,每次返回都是节点信息,将每个节点的结果通过stream返回。

三、问题

1、如何动态实现发送?

2、return后我还有其他操作怎么办?

四、解决思路

题外话:解决问题的思路,才是你的成长,一味地依靠AI,永远在原地踏步。AI只是辅助,思路才是你的价值

问题一

因为只要return之后,只能通过event_generator函数进行操作,所以要让event_generator这个函数的循环是可控,因为外部无法直接调用event_generator。所以可以采用一个队列实现。

1、为什么是队列而不是集合?

因为要采用队列的先进先出的思想,保证数据的先后顺序。

2、是否队列为空,整个循环就结束呢?

不是,因为每个节点执行会有时间差,甚至说处理的比较慢从而导致,数据还没进队列,整个循环就已经结束了。

3、循环结束的节点怎么做?

根据业务来看,因为发送事件信息是有个event鉴别数据的类型,可以通过这个确定最后一个事件是什么从而结束整个循环。如果无法确定,可以设置具体的超时时间比如10s。

下面是具体的代码实现,以3s过期时间为例子。

async def event_generator(messags:deque):
    timeout_seconds = 3  # 从props获取超时时间,默认3秒
    last_data_time = asyncio.get_running_loop().time()
    start = True
    while start:
        # 标准 SSE 格式:data: 内容\n\n
        if messags:
            message = messags.popleft()
            yield f"data: {{\"message\": \"Data chunk {message}\"}}\n\n".encode()
            last_data_time = asyncio.get_running_loop().time()  # 重置计时器
        else:
            # 检查是否超时
            current_time = asyncio.get_running_loop().time()
            if current_time - last_data_time > timeout_seconds:
                logging.info("消息列表已空 {} 秒,结束任务",timeout_seconds)
                break
        # 无数据时发送心跳,避免客户端断开连接
        await asyncio.sleep(0.5)  # 降低 CPU 使用率
# 返回体头
headers = {
                'Content-Type': 'text/event-stream',
                'Cache-Control': 'no-cache',
                'Connection': 'keep-alive'
            }
router = APIRouter()
messags = deque()
message=0
@router.post("/completions/send")
async def send_sse():
    return StreamingResponse(event_generator(messags), headers=headers)

@router.get("/completions/addDeque")
async def send_sse(): # 定义发送信息,每次对message+1操作
    global message
    message=message+1
    messags.append(message)
    return "ok"

效果图:

问题二

流式的想法,是每次调用把结果给到前端。那么问题是我们写这块代码是个很长的模块,在中间会进行流式输入,如果不return,所有的信息全部进了队列,最后return其实是一个一次性返回,跟流式的理念相违背。那么如何去做,这里可以采用携程去实现这个功能。 

我们可以把自己的代码块逻辑丢到携程让携程去做。整个思想逻辑,是用队列的延展性实现流式的输出。所以我们只需要保证,在发送的时候把数据给到队列就行。代码如下:

async def event_generator(messages:deque):
    timeout_seconds = 3  # 从props获取超时时间,默认3秒
    last_data_time = asyncio.get_running_loop().time()
    start = True
    while start:
        # 标准 SSE 格式:data: 内容\n\n
        if messages:
            message = messages.popleft()
            yield f"data: {{\"message\": \"Data chunk {message}\"}}\n\n".encode()
            last_data_time = asyncio.get_running_loop().time()  # 重置计时器
        else:
            # 检查是否超时
            current_time = asyncio.get_running_loop().time()
            if current_time - last_data_time > timeout_seconds:
                logging.info("消息列表已空 {} 秒,结束任务",timeout_seconds)
                break
        # 无数据时发送心跳,避免客户端断开连接
        await asyncio.sleep(0.5)  # 降低 CPU 使用率
# 返回体头
headers = {
                'Content-Type': 'text/event-stream',
                'Cache-Control': 'no-cache',
                'Connection': 'keep-alive'
            }
router = APIRouter()

async def do_something(messages):
    # 操作一
    message1="节点1开始"
    messages.append(message1)
    #模拟每次操作耗时
    await asyncio.sleep(0.5)
    message2="节点1answer"
    messages.append(message2)
    #模拟每次操作耗时
    await asyncio.sleep(0.5)
    message2="节点1结束"
    messages.append(message2)
    #模拟每次操作耗时
    await asyncio.sleep(0.5)


@router.post("/completions/send")
async def send_sse():
    messages = deque()
    asyncio.create_task(do_something(messages))
    return StreamingResponse(event_generator(messages), headers=headers)

:messages是局部变量,这样可以保证,线程安全。

结果如下:

以上就是一文详解python如何实现流式输出的详细内容,更多关于python流式输出的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文