Springboot+Stomp协议实现聊天功能
作者:suyukangchen
本示例实现一个功能,前端通过websocket发送消息给后端服务,后端服务接收到该消息时,原样将消息返回给前端,前端技术栈html+stomp.js,后端SpringBoot,需要的朋友可以参考下
前端代码
这里我对Stomp.js进行了一个简单的封装,写在stomp-client.js里面
/** * 对 stomp 客户端进行封装 */ var client; var subscribes = []; var errorTimes = 0; var endpoint = "/ws"; /** * 建立websocket连接 * @param {Function} onConnecting 开始连接时的回调 * @param {Function} onConnected 连接成功回调 * @param {Function} onError 连接异常或断开回调 */ function connect(onConnecting, onConnected, onError) { onConnecting instanceof Function && onConnecting(); var sock = new SockJS(endpoint); client = Stomp.over(sock); console.log("ws: start connect to " + endpoint); client.connect({}, function (frame) { errorTimes = 0; console.log('connected: ' + frame); // 连接成功后重新订阅 subscribes.forEach(function (item) { client.subscribe(item.destination, function (resp) { console.debug("ws收到消息: ", resp); item.cb(JSON.parse(resp.body)); }); }); onConnected instanceof Function && onConnected(); }, function (err) { errorTimes = errorTimes > 8 ? 0 : errorTimes; var nextTime = ++errorTimes * 3000; console.warn("与服务器断开连接," + nextTime + " 秒后重新连接", err); setTimeout(function () { console.log("尝试重连……"); connect(onConnecting, onConnected, onError); }, nextTime); onError instanceof Function && onError(); }); } /** * 订阅消息,若当前未连接,则会在连接成功后自动订阅 * * 注意,为防止重连导致重复订阅,请勿使用匿名函数做回调 * * @param {String} destination 目标 * @param {Function} cb 回调 */ function subscribe(destination, cb) { var exist = subscribes.filter(function (sub) { return sub.destination === destination && sub.cb === cb }); // 防止重复订阅 if (exist && exist.length) { return; } // 记录所有订阅,在连接成功时统一处理 subscribes.push({ destination: destination, cb: cb }); if (client && client.connected) { client.subscribe(destination, function (resp) { console.debug("ws收到消息: ", resp); cb instanceof Function && cb(JSON.parse(resp.body)); }); } else { console.warn("ws未连接,暂时无法订阅:" + destination) } } /** * 发送消息 * @param {String} destination 目标 * @param {Object} msg 消息体对象 */ function send(destination, msg) { if (!client) { console.error("客户端未连接,无法发送消息!") } client.send(destination, {}, JSON.stringify(msg)); } window.onbeforeunload = function () { // 当窗口关闭时断开连接 if (client && client.connected) { client.disconnect(function () { console.log("websocket disconnected "); }); } };
前端的html页面index.html如下:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>STOMP</title> </head> <body> <h1 id="tip">Welcome!</h1> <p>状态: <span id="status"></span></p> <input type="text" id="content" placeholder="请输入要发送的消息"> <br> <button onclick="sendTextMsg()">发送</button> <ul id="ul"> </ul> <script th:src="@{lib/sockjs.min.js}"></script> <script th:src="@{lib/stomp.min.js}"></script> <script th:src="@{stomp-client.js}"></script> <script> connect(function () { statusChange("连接中..."); }, function () { statusChange("在线"); // 注意,为防止重连导致重复订阅,请勿使用匿名函数做回调 subscribe("/user/topic/subNewMsg", onNewMsg); }, function () { statusChange("离线"); }); function onNewMsg(msg) { var li = document.createElement("li"); li.innerText = msg.content; document.getElementById("ul").appendChild(li); } function sendTextMsg() { var content = document.getElementById("content").value; var msg = { msgType: 1, content: content }; send("/app/echo", msg); } function statusChange(status) { document.getElementById("status").innerText = status; } </script> </body> </html>
后端代码
依赖引入,主要引入下面的包,其它的包略过
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
配置类
@Slf4j @Setter @Configuration @EnableWebSocketMessageBroker @ConfigurationProperties(prefix = "websocket") @RequiredArgsConstructor(onConstructor_ = {@Autowired}) public class WebSocketConfig implements WebSocketMessageBrokerConfigurer, ApplicationListener<BrokerAvailabilityEvent> { private final BrokerConfig brokerConfig; private String[] allowOrigins; @Override public void registerStompEndpoints(StompEndpointRegistry registry) { // 继承DefaultHandshakeHandler并重写determineUser方法,可以自定义如何确定用户 // 添加方法:registry.addEndpoint("/ws").setHandshakeHandler(handshakeHandler) registry.addEndpoint("/ws") .setAllowedOrigins(allowOrigins) .withSockJS(); } /** * 配置消息代理 */ @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.setApplicationDestinationPrefixes("/app"); if (brokerConfig.isUseSimpleBroker()) { // 使用 SimpleBroker // 配置前缀, 有这些前缀的消息会路由到broker registry.enableSimpleBroker("/topic", "/queue") //配置stomp协议里, server返回的心跳 .setHeartbeatValue(new long[]{10000L, 10000L}) //配置发送心跳的scheduler .setTaskScheduler(new DefaultManagedTaskScheduler()); } else { // 使用外部 Broker // 指定前缀,有这些前缀的消息会路由到broker registry.enableStompBrokerRelay("/topic", "/queue") // 广播用户目标,如果要推送的用户不在本地,则通过 broker 广播给集群的其他成员 .setUserDestinationBroadcast("/topic/log-unresolved-user") // 用户注册广播,一旦有用户登录,则广播给集群中的其他成员 .setUserRegistryBroadcast("/topic/log-user-registry") // 虚拟地址 .setVirtualHost(brokerConfig.getVirtualHost()) // 用户密码 .setSystemLogin(brokerConfig.getUsername()) .setSystemPasscode(brokerConfig.getPassword()) .setClientLogin(brokerConfig.getUsername()) .setClientPasscode(brokerConfig.getPassword()) // 心跳间隔 .setSystemHeartbeatSendInterval(10000) .setSystemHeartbeatReceiveInterval(10000) // 使用 setTcpClient 以配置多个 broker 地址,setRelayHost/Port 只能配置一个 .setTcpClient(createTcpClient()); } } /** * 创建 TcpClient 工厂,用于配置多个 broker 地址 */ private ReactorNettyTcpClient<byte[]> createTcpClient() { return new ReactorNettyTcpClient<>( // BrokerAddressSupplier 用于获取中继地址,一次只使用一个,如果该中继出错,则会获取下一个 client -> client.addressSupplier(brokerConfig.getBrokerAddressSupplier()), new StompReactorNettyCodec()); } @Override public void onApplicationEvent(BrokerAvailabilityEvent event) { if (!event.isBrokerAvailable()) { log.warn("stomp broker is not available!!!!!!!!"); } else { log.info("stomp broker is available"); } } }
消息处理
@Slf4j @Controller @RequiredArgsConstructor(onConstructor_ = {@Autowired}) public class StompController { private final SimpMessageSendingOperations msgOperations; private final SimpUserRegistry simpUserRegistry; /** * 回音消息,将用户发来的消息内容加上 Echo 前缀后推送回客户端 */ @MessageMapping("/echo") public void echo(Principal principal, Msg msg) { String username = principal.getName(); msg.setContent("Echo: " + msg.getContent()); msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg); int userCount = simpUserRegistry.getUserCount(); int sessionCount = simpUserRegistry.getUser(username).getSessions().size(); log.info("当前本系统总在线人数: {}, 当前用户: {}, 该用户的客户端连接数: {}", userCount, username, sessionCount); } }
实现效果
报文分析
开启调试模式,我们根据报文来分析一下前后端互通的报文
握手
客户端请求报文如下
GET ws://localhost:8025/ws/035/5hy4avgm/websocket HTTP/1.1 Host: localhost:8025 Connection: Upgrade Pragma: no-cache Cache-Control: no-cache User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.5735.289 Safari/537.36 Upgrade: websocket Origin: http://localhost:8025 Sec-WebSocket-Version: 13 Accept-Encoding: gzip, deflate, br Accept-Language: zh-CN,zh;q=0.9 Cookie: 略 Sec-WebSocket-Key: PlMHmdl2JRzDAVk3feOaeA== Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
服务端响应握手请求
HTTP/1.1 101 Upgrade: websocket Connection: upgrade Sec-WebSocket-Accept: 9CKY8n1j/cHoKsWmpmX4pNlQuZg= Sec-WebSocket-Extensions: permessage-deflate;client_max_window_bits=15 X-Content-Type-Options: nosniff X-XSS-Protection: 1; mode=block Cache-Control: no-cache, no-store, max-age=0, must-revalidate Pragma: no-cache Expires: 0 X-Frame-Options: DENY Date: Thu, 08 Feb 2024 06:58:28 GMT
stomp报文分析
在浏览器消息一栏,我们可以看到长连接过程中通信的报文
下面来简单分析一下stomp的报文
客户端请求连接
其中\n表示换行
[ "CONNECT\naccept-version:1.1,1.0\nheart-beat:10000,10000\n\n\u0000" ]
可以看到请求连接的命令是CONNECT,连接报文里面还包含了心跳的信息
服务端返回连接成功
[ "CONNECTED\nversion:1.1\nheart-beat:10000,10000\nuser-name:admin\n\n\u0000" ]
CONNECTED是服务端连接成功的命令,报文中也包含了心跳的信息
客户端订阅
订阅的目的地是:/user/topic/subNewMsg
["SUBSCRIBE\nid:sub-0\ndestination:/user/topic/subNewMsg\n\n\u0000"]
客户端发送消息
发送的目的地是:/app/echo
[ "SEND\ndestination:/app/echo\ncontent-length:35\n\n{\"msgType\":1,\"content\":\"你好啊\"}\u0000" ]
服务端响应消息
响应的目的地是:/user/topic/subNewMsg,当订阅了这个目的地的,方法,将会被回调
[ "MESSAGE\ndestination:/user/topic/subNewMsg\ncontent-type:application/json;charset=UTF-8\nsubscription:sub-0\nmessage-id:5hy4avgm-1\ncontent-length:41\n\n{\"content\":\"Echo: 你好啊\",\"msgType\":1}\u0000" ]
心跳报文
可以看到,约每隔10S,客户端和服务端都有一次心跳报文,发送的报文内容为一个回车。
[\n]
项目链接:https://gitee.com/syk1234/stomp-demo.git
以上就是Springboot+Stomp协议实现聊天功能的详细内容,更多关于Springboot+Stomp聊天的资料请关注脚本之家其它相关文章!