python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > FastAPI开发Excel数据分析API

基于FastAPI与LangChain开发Excel智能数据分析API详解

作者:东方佑

本文将详细介绍如何使用FastAPI和LangChain构建一个支持流式响应的Excel智能数据分析API,实现对结构化数据的自然语言查询与对话式分析,需要的可以了解下

一、项目概述与核心功能

在现代数据驱动决策的环境中,让非技术用户能够通过自然语言与数据进行交互变得越来越重要。本项目开发了一个基于RESTful API的智能数据分析服务,具有以下核心功能:

二、技术架构与工具选择

本项目采用了以下技术栈,每项技术都承担着特定角色:

技术组件版本要求职责说明
FastAPI>=0.68.0高性能Web框架,提供API服务
LangChain>=0.12.0智能代理和链式处理核心
Pandas>=1.3.0DataFrame数据处理和分析
Uvicorn>=0.15.0ASGI服务器,用于运行FastAPI
Tongyi Qwen-大语言模型,提供自然语言理解能力

技术选型理由

三、核心实现细节

3.1 FastAPI应用初始化

首先创建FastAPI应用实例,并设置元数据信息:

from fastapi import FastAPI

app = FastAPI(
    title="Excel Data Analysis API",
    description="基于Pandas DataFrame的数据分析API",
    version="1.0.0"
)

FastAPI基于Python类型提示自动生成API文档,支持OpenAPI标准,开发者可以通过/docs/redoc端点访问交互式文档。

3.2 数据加载与代理初始化

项目启动时自动加载Excel数据并初始化LangChain代理:

@app.on_event("startup")
async def startup_event():
    """应用启动时初始化模型和代理"""
    try:
        initialize_agent()
        print("模型和代理初始化成功")
    except Exception as e:
        print(f"初始化失败: {e}")
        raise

数据加载过程支持多工作表Excel文件,自动合并所有工作表到一个DataFrame中:

def initialize_agent():
    global llm, pandas_agent
    
    # 读取Excel文件中的所有工作表
    excel_file = pd.ExcelFile(path)
    all_sheets = {}
    for sheet_name in excel_file.sheet_names:
        all_sheets[sheet_name] = pd.read_excel(excel_file, sheet_name=sheet_name)
    
    # 合并所有工作表
    df = pd.concat(all_sheets.values(), ignore_index=True)

这种方法确保了无论Excel文件的结构如何,都能完整地加载所有数据。

3.3 LangChain代理创建

使用LangChain的create_pandas_dataframe_agent创建专门用于Pandas数据处理的AI代理:

pandas_agent = create_pandas_dataframe_agent(
    llm,  # 通义千问模型实例
    df,   # 合并后的DataFrame
    verbose=True,  # 启用详细日志
    agent_type="zero-shot-react-description",  # 代理类型
    allow_dangerous_code=True,  # 允许执行代码
    max_iterations=5  # 最大迭代次数
)

这种代理结合了大语言模型的语言理解能力和Pandas的数据处理能力,能够理解自然语言查询并将其转换为DataFrame操作。

3.4 流式响应实现

为实现真正的流式输出,创建了自定义回调处理器:

class StreamingCallbackHandler(BaseCallbackHandler):
    """自定义回调处理器,用于实现真正的流式输出"""

    def __init__(self):
        self.tokens = []
        self.finished = False

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        """当LLM生成新token时调用"""
        self.tokens.append(token)

    def on_llm_end(self, response: Any, **kwargs: Any) -> None:
        """当LLM生成结束时调用"""
        self.finished = True

流式响应接口使用Server-Sent Events(SSE)技术:

@app.post("/chat", response_model=ChatResponse)
async def chat_with_data(request: ChatRequest):
    # ... 省略其他代码 ...
    
    if request.stream:
        return StreamingResponse(
            true_stream_response(full_input, session_history, request.session_id),
            media_type="text/event-stream",
            headers={
                "Cache-Control": "no-cache",
                "Connection": "keep-alive",
            }
        )

这种方式相比传统响应,能够实时地将生成的token发送给客户端,大幅减少用户感知的延迟。

3.5 会话管理机制

为实现多用户支持,实现了基于session_id的会话管理:

conversation_history = {}

# 在聊天接口中维护会话历史
session_history = conversation_history.get(request.session_id, [])
session_history.append(request.message)
conversation_history[request.session_id] = session_history

还提供了清除历史记录的端点:

@app.delete("/clear_history/{session_id}")
async def clear_history(session_id: str):
    """清除指定会话的历史记录"""
    if session_id in conversation_history:
        del conversation_history[session_id]
    return {"message": f"会话 {session_id} 的历史记录已清除"}

这种设计允许不同用户或不同对话线程保持独立的上下文历史。

四、API端点设计

本项目实现了以下RESTful端点:

端点方法描述参数
/chatPOST主聊天接口,支持流式和非流式响应session_id, message, history_length, stream
/clear_history/{session_id}DELETE清除指定会话的历史记录session_id
/healthGET健康检查端点
/stream_testGET流式接口测试端点

主要请求和响应模型

class ChatRequest(BaseModel):
    session_id: str  # 会话ID,用于区分不同用户的对话历史
    message: str  # 用户消息
    history_length: Optional[int] = 5  # 历史消息长度,默认为5
    stream: Optional[bool] = False  # 是否使用流式响应

class ChatResponse(BaseModel):
    session_id: str
    response: str
    success: bool
    error: Optional[str] = None

五、部署与性能优化

5.1 生产环境部署

使用Uvicorn作为ASGI服务器部署应用:

uvicorn main:app --host 0.0.0.0 --port 9113 --workers 4 --timeout-keep-alive 300

参数说明

5.2 性能优化建议

六、应用场景与扩展方向

6.1 典型应用场景

6.2 扩展方向

七、总结与展望

本项目展示了如何将FastAPI、LangChain和大语言模型结合,构建一个功能强大的智能数据分析API。关键优势包括:

未来发展方向

通过这种技术组合,我们能够将先进的大语言模型能力转化为实用的企业级应用,真正实现"用自然语言与数据对话"的愿景。

完整代码

# main.py
import pandas as pd
from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_experimental.agents import create_pandas_dataframe_agent
import os
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
from typing import List, Optional
from fastapi.responses import StreamingResponse
import asyncio
import json
import time
from langchain.callbacks.base import BaseCallbackHandler
from typing import Any, Dict, List
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

# 初始化 FastAPI 应用
app = FastAPI(title="Excel Data Analysis API", description="基于 Pandas DataFrame 的数据分析 API")

# 全局变量存储模型和代理
llm = None
pandas_agent = None
conversation_history = {}


class StreamingCallbackHandler(BaseCallbackHandler):
    """自定义回调处理器,用于实现真正的流式输出"""

    def __init__(self):
        self.tokens = []
        self.finished = False

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        """当LLM生成新token时调用"""
        self.tokens.append(token)

    def on_llm_end(self, response: Any, **kwargs: Any) -> None:
        """当LLM生成结束时调用"""
        self.finished = True

    def get_tokens(self):
        """获取已生成的tokens"""
        return self.tokens


class ChatRequest(BaseModel):
    session_id: str  # 会话ID,用于区分不同用户的对话历史
    message: str  # 用户消息
    history_length: Optional[int] = 5  # 历史消息长度,默认为5
    stream: Optional[bool] = False  # 是否使用流式响应


class ChatResponse(BaseModel):
    session_id: str
    response: str
    success: bool
    error: Optional[str] = None


def initialize_agent():
    """初始化模型和数据代理"""
    global llm, pandas_agent

    path = r'./data/1.xlsx'

    # 检查文件是否存在
    if not os.path.exists(path):
        raise FileNotFoundError(f"文件未找到:{path}")

    # 读取 Excel 中的所有工作表
    excel_file = pd.ExcelFile(path)
    all_sheets = {}
    for sheet_name in excel_file.sheet_names:
        all_sheets[sheet_name] = pd.read_excel(excel_file, sheet_name=sheet_name)

    # 合并所有工作表到一个DataFrame中
    df = pd.concat(all_sheets.values(), ignore_index=True)

    os.environ["DASHSCOPE_API_KEY"] = 'sk-50254f6d4df1ab3c9baf30093c4e'

    llm = ChatTongyi(
        model="qwen-max-latest",
        temperature=0.4,
        streaming=True  # 启用流式输出
    )

    # 创建excel_agent
    pandas_agent = create_pandas_dataframe_agent(
        llm,
        df,
        verbose=True,
        agent_type="zero-shot-react-description",
        allow_dangerous_code=True,
        max_iterations=5
    )


@app.on_event("startup")
async def startup_event():
    """应用启动时初始化模型和代理"""
    try:
        initialize_agent()
        print("模型和代理初始化成功")
    except Exception as e:
        print(f"初始化失败: {e}")
        raise


@app.post("/chat", response_model=ChatResponse)
async def chat_with_data(request: ChatRequest):
    """与数据进行对话(支持流式和非流式响应)"""
    global pandas_agent, conversation_history

    if not pandas_agent:
        raise HTTPException(status_code=500, detail="模型未初始化")

    try:
        # 获取或创建会话历史
        session_history = conversation_history.get(request.session_id, [])

        # 构建历史文本
        history_text = "\n".join(session_history[-request.history_length:])

        # 构造带上下文的输入
        if history_text:
            full_input = f"聊天历史:{history_text},当前问题:{request.message},请根据历史和当前问题,不要截断输出,展示全部内容,不要总结,严格按照查询内容输出,不要多余输出,如果无法确定指代对象,请询问用户澄清,并且不要编造。".strip()
        else:
            full_input = request.message

        # 如果请求流式响应
        if request.stream:
            return StreamingResponse(
                true_stream_response(full_input, session_history, request.session_id),
                media_type="text/event-stream",
                headers={
                    "Cache-Control": "no-cache",
                    "Connection": "keep-alive",
                }
            )

        # 非流式响应
        response = pandas_agent.invoke({"input": full_input})
        output = response['output'] if isinstance(response, dict) else str(response)

        # 更新会话历史
        session_history.append(request.message)
        conversation_history[request.session_id] = session_history

        return ChatResponse(
            session_id=request.session_id,
            response=output,
            success=True
        )

    except Exception as e:
        if request.stream:
            # 对于流式请求,返回流式错误响应
            async def error_stream():
                yield f"data: {json.dumps({'error': str(e)})}\n\n"

            return StreamingResponse(error_stream(), media_type="text/event-stream")
        else:
            return ChatResponse(
                session_id=request.session_id,
                response="",
                success=False,
                error=str(e)
            )


async def true_stream_response(input_text: str, session_history: list, session_id: str):
    """真正的流式响应生成器"""
    global pandas_agent, conversation_history

    try:
        # 使用回调处理器捕获流式输出
        callback_handler = StreamingCallbackHandler()

        # 在后台运行代理处理
        async def run_agent():
            try:
                # 调用代理,传入回调处理器
                response = await asyncio.get_event_loop().run_in_executor(
                    None,
                    lambda: pandas_agent.invoke(
                        {"input": input_text},
                        {"callbacks": [callback_handler]}
                    )
                )

                # 确保结束标记被设置
                callback_handler.finished = True

            except Exception as e:
                print(f"代理执行错误: {e}")
                callback_handler.finished = True

        # 启动代理任务
        agent_task = asyncio.create_task(run_agent())

        # 流式输出循环
        last_token_count = 0
        start_time = time.time()
        max_wait_time = 60  # 最大等待时间60秒

        while not callback_handler.finished and (time.time() - start_time) < max_wait_time:
            current_tokens = callback_handler.get_tokens()

            # 如果有新token,发送给客户端
            if len(current_tokens) > last_token_count:
                for i in range(last_token_count, len(current_tokens)):
                    token_data = {
                        "token": current_tokens[i],
                        "type": "token"
                    }
                    yield f"data: {json.dumps(token_data)}\n\n"

                last_token_count = len(current_tokens)

            # 短暂等待后继续检查
            await asyncio.sleep(0.1)

        # 如果超时,发送错误信息
        if (time.time() - start_time) >= max_wait_time:
            error_data = {"error": "请求超时", "type": "error"}
            yield f"data: {json.dumps(error_data)}\n\n"
        else:
            # 发送完成标记
            done_data = {"done": True, "type": "done"}
            yield f"data: {json.dumps(done_data)}\n\n"

            # 更新会话历史
            current_message = input_text.split("当前问题:")[1].split(",")[
                0] if "当前问题:" in input_text else input_text
            session_history.append(current_message)
            conversation_history[session_id] = session_history

        # 等待代理任务完成(如果还未完成)
        if not agent_task.done():
            agent_task.cancel()

    except Exception as e:
        error_data = {"error": f"流式处理错误: {str(e)}", "type": "error"}
        yield f"data: {json.dumps(error_data)}\n\n"


@app.delete("/clear_history/{session_id}")
async def clear_history(session_id: str):
    """清除指定会话的历史记录"""
    if session_id in conversation_history:
        del conversation_history[session_id]
    return {"message": f"会话 {session_id} 的历史记录已清除"}


@app.get("/health")
async def health_check():
    """健康检查接口"""
    return {
        "status": "healthy",
        "model_loaded": pandas_agent is not None,
        "active_sessions": len(conversation_history)
    }


@app.get("/stream_test")
async def stream_test():
    """测试流式接口"""

    async def generate_test_data():
        for i in range(10):
            yield f"data: 测试消息 {i}\n\n"
            await asyncio.sleep(1)

    return StreamingResponse(generate_test_data(), media_type="text/event-stream")


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=9113)  

以上就是基于FastAPI与LangChain开发Excel智能数据分析API详解的详细内容,更多关于FastAPI开发Excel数据分析API的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文