java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Springboot SSE推送

Springboot使用SSE推送消息到客户端的实现

作者:爱吃的强哥

文章浏览阅读246次,点赞4次,收藏4次,本文介绍了在SpringBoot中实现SSE(Server-Sent Events)服务端推送功能,用于Electron桌面应用的消息推送,感兴趣的可以了解一下

1、场景:

服务端主动推动消息到客户端(Electron 桌面应用)

普通的HTTP/HTTPS请求要先客户端发送请求,然后服务端才能返回结果

服务端主动推送数据到客户端,有两种方案:

SSE:只能从服务端主动推送数据到客户端(单向),客户端发送数据还是要使用HTTP/HTTPS请求

Socket 通信:客户端和服务端都可以发送数据给对方(双向)

先记录SSE的配置(我的服务端之前是用nodejs express 写的,已经使用 socket.io与客户端适配好了,现在用Springboot重构)

2、实现:

1、springboot 配置:

1、先实现Server层,方便其他控制器调用

package com.xxx.controller;
 
import com.qiang.service.SseService;
import com.qiang.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
 
@RestController
@RequestMapping("/sse")
public class SseController {
 
    // 注入 SSE 服务类
    @Autowired
    private SseService sseService;
    @Autowired
    private UserService userService;
 
    /**
     * 建立 SSE 连接(调用 Service 的注册方法)
     */
    @GetMapping("/connect")
    public SseEmitter connect(@RequestParam String clientId) {
        // 查询会话成员表,userId通过客户端id获取该用户所有的群聊
        String userId = clientId.split("_")[2].trim();
        List<String> groupIdList = userService.getUserGroupChat(userId);
        Set<String> groupIdSet = new HashSet<>();
        if (groupIdList != null && !groupIdList.isEmpty()) {
            groupIdSet = new HashSet<>(groupIdList); // List → Set,自动去重
        }
        return sseService.registerClient(clientId, groupIdSet);
    }
 
    /**
     * 手动推送(调用 Service 的推送方法)
     */
    @PostMapping("/push")
    public String push(@RequestBody Map<String, String> params) {
        String clientId = params.get("clientId");
        String message = params.get("message");
        return sseService.sendMessage(clientId, "business", message);
    }
 
    /**
     * 获取在线客户端数量
     */
    @GetMapping("/count")
    public String getClientCount() {
        return "当前在线客户端数量:" + sseService.getConnectedClientCount();
    }
}

2、再实现控制层:

package com.xxx.controller;
 
import com.qiang.service.SseService;
import com.qiang.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
 
@RestController
@RequestMapping("/sse")
public class SseController {
 
    // 注入 SSE 服务类
    @Autowired
    private SseService sseService;
    @Autowired
    private UserService userService;
 
    /**
     * 建立 SSE 连接(调用 Service 的注册方法)
     */
    @GetMapping("/connect")
    public SseEmitter connect(@RequestParam String clientId) {
        // 查询会话成员表,userId通过客户端id获取该用户所有的群聊
        String userId = clientId.split("_")[2].trim();
        List<String> groupIdList = userService.getUserGroupChat(userId);
        Set<String> groupIdSet = new HashSet<>();
        if (groupIdList != null && !groupIdList.isEmpty()) {
            groupIdSet = new HashSet<>(groupIdList); // List → Set,自动去重
        }
        return sseService.registerClient(clientId, groupIdSet);
    }
 
    /**
     * 手动推送(调用 Service 的推送方法)
     */
    @PostMapping("/push")
    public String push(@RequestBody Map<String, String> params) {
        String clientId = params.get("clientId");
        String message = params.get("message");
        return sseService.sendMessage(clientId, "business", message);
    }
 
    /**
     * 获取在线客户端数量
     */
    @GetMapping("/count")
    public String getClientCount() {
        return "当前在线客户端数量:" + sseService.getConnectedClientCount();
    }
}

2、客户端(Electron 配置):

eventsource 是浏览器对象

我是在客户端主进程中接收消息的,要下载依赖。如果是在渲染进程或者是普通前端使用则不需要下载依赖

"eventsource": "^2.0.2"

1、工具函数:

// src/util/sseClient.js(主进程专用)
const EventSource = require('eventsource');
 
class SseClient {
    constructor(clientId) {
        this.clientId = clientId;
        this.isConnected = false; // 标记是否真正连接成功
        this.initSse();
    }
 
    initSse() {
        this.close(); // 关闭旧连接
 
        const sseUrl = `http://localhost:8088/sse/connect?clientId=${this.clientId}`;
        console.log(`[SSE] 尝试连接:${sseUrl}`);
 
        this.es = new EventSource(sseUrl);
 
        // 1. 连接成功(标记真正的连接状态)
        this.es.onopen = () => {
            this.isConnected = true;
            console.log('[SSE] 连接成功');
        };
 
        // 2. 优化错误处理(过滤无害错误)
        this.es.onerror = (e) => {
            // 过滤:连接成功前的无消息错误(无害)
            if (!this.isConnected && e.message === undefined) {
                console.log('[SSE] 初始化阶段临时错误(无害):', e.type);
                return; // 不打印错误,避免干扰
            }
 
            // 真正的错误(连接断开/失败)
            this.isConnected = false;
            console.error('[SSE] 真正的连接错误:', {
                type: e.type,
                message: e.message || '未知错误',
                readyState: this.es.readyState // 0:连接中, 1:已连接, 2:已关闭
            });
 
            // 仅在连接关闭时重连
            if (this.es.readyState === EventSource.CLOSED) {
                console.log(`[SSE] 3秒后尝试重连...`);
                setTimeout(() => this.initSse(), 3000);
            }
        };
 
        // 3. 正常接收消息
        this.es.onmessage = (e) => {
            try {
                const cleanData = e.data.replace(/^data: /, '').trim();
                const messageObj = JSON.parse(cleanData);
                // 打印解析结果(验证)
                console.log('[SSE] 解析后的完整对象:', messageObj);
            } catch (err) {
                // 解析失败时的容错
                console.warn('[SSE] 解析失败,原始数据:', e.data);
                console.error('[SSE] 解析错误详情:', err);
            }
        };
 
        // 4. 监听自定义事件(如 notification/business)
        this.es.addEventListener('notification', (e) => {
            const data = JSON.parse(e.data);
            console.log('[SSE] 通知消息:', data);
        });
    }
 
    close() {
        if (this.es) {
            this.es.close();
            this.es = null;
            this.isConnected = false;
        }
    }
 
    // 手动推送(修复后的 POST 版本)
    async triggerPush(message) {
        if (!this.isConnected) {
            console.warn('[SSE] 未连接,无法推送');
            return null;
        }
        try {
            const response = await fetch(`${this.serverUrl || 'http://localhost:8088'}/sse/push`, {
                method: 'POST',
                headers: { 'Content-Type': 'application/json' },
                body: JSON.stringify({ clientId: this.clientId, message })
            });
            const result = await response.text();
            console.log('[SSE] 手动推送结果:', result);
            return result;
        } catch (err) {
            console.error('[SSE] 手动推送失败:', err);
            return null;
        }
    }
}
 
module.exports = SseClient;

2、调用:

// clientId 要唯一,否则服务端推送消息时会有影响 
new SseClient("自定义的clientId");

到此这篇关于Springboot使用SSE推送消息到客户端的实现的文章就介绍到这了,更多相关Springboot SSE推送内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文