Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Golang中SSE协议实现

Golang中SSE协议实现方法

作者:shuix1ng

Gin框架通过Server-SentEvents(SSE)实现流式返回,关键步骤包括设置正确的响应头、使用Flusher确保数据立即发送、处理客户端中断和错误,本文给大家介绍Golang中SSE协议实现方法,感兴趣的朋友跟随小编一起看看吧

Gin流式返回实现方法

🧑‍💻 User

gin框架如何做到流式返回

🤖 Assistant

在 Gin 框架中实现流式返回主要通过以下几个关键技术点:

1. 使用 Server-Sent Events (SSE)

SSE 是实现流式返回的常用方式:

package main
import (
    "fmt"
    "net/http"
    "time"
    "github.com/gin-gonic/gin"
)
func main() {
    r := gin.Default()
    r.GET("/stream", func(c *gin.Context) {
        // 设置响应头
        c.Header("Content-Type", "text/event-stream")
        c.Header("Cache-Control", "no-cache")
        c.Header("Connection", "keep-alive")
        c.Header("Access-Control-Allow-Origin", "*")
        // 创建通道用于发送数据
        messageChan := make(chan string)
        // 模拟数据生成
        go func() {
            defer close(messageChan)
            for i := 0; i < 10; i++ {
                message := fmt.Sprintf("消息 %d: 当前时间 %v", i, time.Now().Format("15:04:05"))
                messageChan <- message
                time.Sleep(1 * time.Second)
            }
        }()
        // 流式返回数据
        c.Stream(func(w io.Writer) bool {
            if message, ok := <-messageChan; ok {
                // SSE 格式: data: {message}\n\n
                c.SSEvent("message", message)
                return true
            }
            return false
        })
    })
    r.Run(":8080")
}

2. 手动控制响应流

更底层的方式是直接操作 ResponseWriter:

r.GET("/stream-data", func(c *gin.Context) {
    c.Header("Content-Type", "text/plain")
    c.Header("Transfer-Encoding", "chunked")
    flusher, ok := c.Writer.(http.Flusher)
    if !ok {
        c.String(http.StatusInternalServerError, "Streaming not supported")
        return
    }
    for i := 0; i < 10; i++ {
        fmt.Fprintf(c.Writer, "数据块 %d\n", i)
        flusher.Flush() // 立即发送到客户端
        time.Sleep(1 * time.Second)
    }
})

3. 结合上下文取消机制

处理客户端中断连接的情况:

r.GET("/stream-with-context", func(c *gin.Context) {
    c.Header("Content-Type", "text/event-stream")
    c.Header("Cache-Control", "no-cache")
    c.Header("Connection", "keep-alive")
    flusher, _ := c.Writer.(http.Flusher)
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()
    for {
        select {
        case <-c.Request.Context().Done():
            // 客户端断开连接
            fmt.Println("客户端断开连接")
            return
        case t := <-ticker.C:
            data := fmt.Sprintf("时间: %s", t.Format("15:04:05"))
            c.SSEvent("update", data)
            flusher.Flush()
        }
    }
})

4. JSON 流式返回

对于结构化数据的流式返回:

r.GET("/stream-json", func(c *gin.Context) {
    c.Header("Content-Type", "application/x-ndjson") // 换行分隔的JSON
    flusher, _ := c.Writer.(http.Flusher)
    for i := 0; i < 5; i++ {
        data := gin.H{
            "id":   i,
            "time": time.Now().Unix(),
            "data": fmt.Sprintf("项目 %d", i),
        }
        c.Render(-1, gin.JSON{Data: data})
        fmt.Fprintln(c.Writer) // 添加换行符
        flusher.Flush()
        time.Sleep(1 * time.Second)
    }
})

5. 文件流式下载

大文件的流式下载:

r.GET("/download-large-file", func(c *gin.Context) {
    filePath := "./large-file.txt"
    c.Header("Content-Type", "application/octet-stream")
    c.Header("Content-Disposition", "attachment; filename=large-file.txt")
    file, err := os.Open(filePath)
    if err != nil {
        c.String(http.StatusNotFound, "文件不存在")
        return
    }
    defer file.Close()
    // 获取文件信息
    fileInfo, _ := file.Stat()
    c.Header("Content-Length", fmt.Sprintf("%d", fileInfo.Size()))
    // 流式传输文件内容
    io.Copy(c.Writer, file)
})

6. 前端示例

对应的前端代码:

<!DOCTYPE html>
<html>
<head>
    <title>SSE 示例</title>
</head>
<body>
    <div id="messages"></div>
    <script>
        const eventSource = new EventSource('/stream');
        eventSource.onmessage = function(event) {
            const messageDiv = document.createElement('div');
            messageDiv.textContent = event.data;
            document.getElementById('messages').appendChild(messageDiv);
        };
        eventSource.onerror = function(event) {
            console.error('SSE 错误:', event);
        };
    </script>
</body>
</html>

关键要点

这种方式特别适合实时数据推送、大文件下载、长时间运行的任务进度更新等场景。

🧑‍💻 User

这个服务是如何返回的:

data: {"id":"019b84c169efb22b959b99a7914e4712","object":"chat.completion.chunk","created":1767458826,"model":"deepseek-ai/DeepSeek-V3.2","choices":[{"index":0,"delta":{"content":"","reasoning_content":null,"role":"assistant"},"finish_reason":"stop"}],"system_fingerprint":"","usage":{"prompt_tokens":23,"completion_tokens":19,"total_tokens":42,"completion_tokens_details":{"reasoning_tokens":0}}}

data: [DONE]

🤖 Assistant

这是一个典型的 Server-Sent Events (SSE) 流式响应格式。让我详细解析这种返回方式的工作原理:

响应格式解析

// 每条数据都以 "data: " 开头,后跟 JSON 数据
data: {"id":"019b84c169efb22b959b99a7914e4712","object":"chat.completion.chunk","created":1767458826,"model":"deepseek-ai/DeepSeek-V3.2","choices":[{"index":0,"delta":{"content":"服务。","reasoning_content":null,"role":"assistant"},"finish_reason":null}],"usage":{...}}
// 最后以 [DONE] 结束
data: [DONE]

Gin 框架实现代码示例

package main
import (
    "encoding/json"
    "fmt"
    "net/http"
    "time"
    "github.com/gin-gonic/gin"
)
func main() {
    r := gin.Default()
    r.GET("/chat/stream", func(c *gin.Context) {
        // 设置 SSE 响应头
        c.Header("Content-Type", "text/event-stream")
        c.Header("Cache-Control", "no-cache")
        c.Header("Connection", "keep-alive")
        c.Header("Access-Control-Allow-Origin", "*")
        // 检查是否支持流式传输
        flusher, ok := c.Writer.(http.Flusher)
        if !ok {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Streaming not supported"})
            return
        }
        // 模拟 AI 对话的逐词返回
        messages := []string{"服务。", "请问有什么", "可以帮", "您的吗", "?那我", "先退", "下了。"}
        for i, message := range messages {
            // 构建响应数据
            response := gin.H{
                "id":      "019b84c169efb22b959b99a7914e4712",
                "object":  "chat.completion.chunk",
                "created": time.Now().Unix(),
                "model":   "deepseek-ai/DeepSeek-V3.2",
                "choices": []gin.H{
                    {
                        "index": 0,
                        "delta": gin.H{
                            "content":         message,
                            "reasoning_content": nil,
                            "role":           "assistant",
                        },
                        "finish_reason": nil,
                    },
                },
                "usage": gin.H{
                    "prompt_tokens":     23,
                    "completion_tokens": 5 + i*2, // 模拟token计数
                    "total_tokens":      28 + i*2,
                    "completion_tokens_details": gin.H{
                        "reasoning_tokens": 0,
                    },
                },
            }
            // 转换为 JSON
            jsonData, _ := json.Marshal(response)
            // 按照 SSE 格式写入:data: {json}\n\n
            fmt.Fprintf(c.Writer, "data: %s\n\n", string(jsonData))
            flusher.Flush() // 立即刷新到客户端
            time.Sleep(200 * time.Millisecond) // 模拟处理延迟
        }
        // 发送结束标志
        fmt.Fprintf(c.Writer, "data: [DONE]\n\n")
        flusher.Flush()
    })
    r.Run(":8080")
}

更完整的实现(包含错误处理)

func streamingChatHandler(c *gin.Context) {
    // SSE 头设置
    c.Header("Content-Type", "text/event-stream")
    c.Header("Cache-Control", "no-cache")
    c.Header("Connection", "keep-alive")
    c.Header("Access-Control-Allow-Origin", "*")
    flusher, ok := c.Writer.(http.Flusher)
    if !ok {
        c.JSON(http.StatusInternalServerError, gin.H{"error": "Streaming not supported"})
        return
    }
    // 监听客户端断开连接
    ctx := c.Request.Context()
    // 使用 Gin 的 Stream 方法(推荐)
    c.Stream(func(w io.Writer) bool {
        select {
        case <-ctx.Done():
            return false // 客户端断开,停止流式传输
        default:
            // 生成聊天数据
            chunk := generateChatChunk()
            if chunk == nil {
                // 发送结束信号
                c.SSEvent("", "[DONE]")
                return false
            }
            // 发送数据块
            c.SSEvent("", chunk)
            return true
        }
    })
}
func generateChatChunk() interface{} {
    // 这里可以连接真实的 AI 模型
    // 返回数据块结构
    return map[string]interface{}{
        "id":      generateID(),
        "object":  "chat.completion.chunk",
        "choices": []map[string]interface{}{
            {
                "delta": map[string]interface{}{
                    "content": getNextWord(),
                },
            },
        },
    }
}

前端接收示例

<!DOCTYPE html>
<html>
<head>
    <title>SSE Chat</title>
</head>
<body>
    <div id="chat-output"></div>
    <script>
        const chatOutput = document.getElementById('chat-output');
        const eventSource = new EventSource('/chat/stream');
        eventSource.onmessage = function(event) {
            if (event.data === '[DONE]') {
                eventSource.close();
                return;
            }
            try {
                const data = JSON.parse(event.data);
                if (data.choices && data.choices[0].delta.content) {
                    chatOutput.innerHTML += data.choices[0].delta.content;
                }
            } catch (error) {
                console.error('解析错误:', error);
            }
        };
        eventSource.onerror = function(event) {
            console.error('SSE 错误:', event);
            eventSource.close();
        };
    </script>
</body>
</html>

关键技术点

这种模式广泛应用于 ChatGPT、Claude 等 AI 服务的流式对话功能。

🧑‍💻 User

golang发起请求是流式输出的时候可不可以支持流式print出来,而不是等待全部结束才输出

🤖 Assistant

当然可以!Golang 发起请求时也可以实现流式输出,而不是等待全部结束才输出。以下是几种实现方式:

1. 使用 HTTP 标准库实现流式读取

package main
import (
    "bufio"
    "fmt"
    "io"
    "net/http"
    "time"
)
func main() {
    // 创建请求
    req, err := http.NewRequest("GET", "http://localhost:8080/chat/stream", nil)
    if err != nil {
        panic(err)
    }
    // 设置接受流式响应
    req.Header.Set("Accept", "text/event-stream")
    req.Header.Set("Cache-Control", "no-cache")
    // 发送请求
    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        panic(err)
    }
    defer resp.Body.Close()
    // 创建读取器
    reader := bufio.NewReader(resp.Body)
    fmt.Println("开始接收流式响应:")
    for {
        // 逐行读取
        line, err := reader.ReadBytes('\n')
        if err != nil {
            if err == io.EOF {
                break
            }
            fmt.Printf("读取错误: %v\n", err)
            break
        }
        // 处理 SSE 格式的数据行
        lineStr := string(line)
        if len(lineStr) > 6 && lineStr[:6] == "data: " {
            data := lineStr[6:]
            if data == "[DONE]\n" {
                fmt.Println("\n流式传输结束")
                break
            }
            // 这里可以解析 JSON 并提取内容
            fmt.Printf("收到数据块: %s", data)
        }
    }
}

2. 完整的 SSE 客户端实现

package main
import (
    "bufio"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "strings"
)
type StreamResponse struct {
    ID      string `json:"id"`
    Object  string `json:"object"`
    Choices []struct {
        Delta struct {
            Content string `json:"content"`
        } `json:"delta"`
    } `json:"choices"`
}
func main() {
    streamSSE("http://localhost:8080/chat/stream")
}
func streamSSE(url string) {
    req, _ := http.NewRequest("GET", url, nil)
    req.Header.Set("Accept", "text/event-stream")
    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        fmt.Printf("请求错误: %v\n", err)
        return
    }
    defer resp.Body.Close()
    if resp.StatusCode != http.StatusOK {
        fmt.Printf("HTTP错误: %s\n", resp.Status)
        return
    }
    scanner := bufio.NewScanner(resp.Body)
    var fullResponse strings.Builder
    fmt.Print("AI回复: ")
    for scanner.Scan() {
        line := scanner.Text()
        if line == "" {
            continue // 跳过空行
        }
        if strings.HasPrefix(line, "data: ") {
            data := line[6:]
            if data == "[DONE]" {
                fmt.Println("\n\n=== 流式传输完成 ===")
                break
            }
            // 解析 JSON
            var chunk StreamResponse
            if err := json.Unmarshal([]byte(data), &chunk); err != nil {
                fmt.Printf("JSON解析错误: %v\n", err)
                continue
            }
            // 输出内容
            if len(chunk.Choices) > 0 && chunk.Choices[0].Delta.Content != "" {
                content := chunk.Choices[0].Delta.Content
                fmt.Print(content)
                fullResponse.WriteString(content)
            }
        }
    }
    if err := scanner.Err(); err != nil {
        fmt.Printf("读取流错误: %v\n", err)
    }
    fmt.Printf("\n完整回复: %s\n", fullResponse.String())
}

3. 使用 Context 支持取消操作

package main
import (
    "bufio"
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "os"
    "os/signal"
    "strings"
    "syscall"
    "time"
)
func main() {
    // 创建可取消的上下文
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    // 监听中断信号
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
    go func() {
        <-sigCh
        fmt.Println("\n接收到中断信号,停止流式接收...")
        cancel()
    }()
    // 启动流式接收
    if err := streamWithContext(ctx, "http://localhost:8080/chat/stream"); err != nil {
        fmt.Printf("错误: %v\n", err)
    }
}
func streamWithContext(ctx context.Context, url string) error {
    req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
    if err != nil {
        return err
    }
    req.Header.Set("Accept", "text/event-stream")
    client := &http.Client{
        Timeout: 0, // 无超时限制
    }
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    // 创建带缓冲的读取器
    reader := bufio.NewReader(resp.Body)
    buffer := make([]byte, 1024)
    fmt.Println("开始流式接收 (按Ctrl+C停止):")
    fmt.Print("> ")
    for {
        select {
        case <-ctx.Done():
            fmt.Println("\n用户主动停止")
            return nil
        default:
            // 非阻塞读取
            n, err := reader.Read(buffer)
            if err != nil {
                if err == io.EOF {
                    fmt.Println("\n流结束")
                    return nil
                }
                return err
            }
            if n > 0 {
                data := string(buffer[:n])
                processSSEData(data)
            }
            time.Sleep(50 * time.Millisecond) // 避免CPU占用过高
        }
    }
}
func processSSEData(data string) {
    lines := strings.Split(data, "\n")
    for _, line := range lines {
        line = strings.TrimSpace(line)
        if strings.HasPrefix(line, "data: ") {
            content := line[6:]
            if content == "[DONE]" {
                fmt.Println("\n[传输完成]")
                return
            }
            // 简单的文本输出(可根据需要解析JSON)
            fmt.Print(content)
        }
    }
}

4. 使用第三方库简化操作

package main
import (
    "fmt"
    "github.com/r3labs/sse"
)
func main() {
    client := sse.NewClient("http://localhost:8080/chat/stream")
    fmt.Println("使用SSE库接收流式响应:")
    // 订阅事件
    err := client.Subscribe("message", func(msg *sse.Event) {
        // 收到消息时立即处理
        fmt.Printf("收到消息: %s\n", string(msg.Data))
    })
    if err != nil {
        fmt.Printf("订阅错误: %v\n", err)
    }
    // 保持运行
    select {}
}

5. 简单的命令行聊天客户端示例

package main
import (
    "bufio"
    "encoding/json"
    "fmt"
    "net/http"
    "os"
    "strings"
)
func main() {
    reader := bufio.NewReader(os.Stdin)
    for {
        fmt.Print("你: ")
        input, _ := reader.ReadString('\n')
        input = strings.TrimSpace(input)
        if input == "exit" {
            break
        }
        // 发送请求并流式接收回复
        streamChatResponse(input)
        fmt.Println()
    }
}
func streamChatResponse(prompt string) {
    url := fmt.Sprintf("http://localhost:8080/chat?message=%s", prompt)
    req, _ := http.NewRequest("GET", url, nil)
    req.Header.Set("Accept", "text/event-stream")
    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        fmt.Printf("请求错误: %v\n", err)
        return
    }
    defer resp.Body.Close()
    scanner := bufio.NewScanner(resp.Body)
    fmt.Print("AI: ")
    for scanner.Scan() {
        line := scanner.Text()
        if strings.HasPrefix(line, "data: ") {
            data := line[6:]
            if data == "[DONE]" {
                break
            }
            var result map[string]interface{}
            if json.Unmarshal([]byte(data), &result) == nil {
                if choices, ok := result["choices"].([]interface{}); ok && len(choices) > 0 {
                    if choice, ok := choices[0].(map[string]interface{}); ok {
                        if delta, ok := choice["delta"].(map[string]interface{}); ok {
                            if content, ok := delta["content"].(string); ok {
                                fmt.Print(content)
                            }
                        }
                    }
                }
            }
        }
    }
}

关键要点

这样就能实现类似 ChatGPT 的流式输出效果,每个数据块到达时立即显示,而不是等待全部完成。

到此这篇关于Golang中SSE协议实现的文章就介绍到这了,更多相关Golang中SSE协议实现内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文