java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringCloud Netty实现WebSocket

SpringCloud整合Netty集群实现WebSocket的示例代码

作者:黑加菲

文章主要介绍了SpringCloud整合Netty集群实现WebSocket的相关内容,包括服务注册和发现中心的配置,如使用Nacos、CommandLineRunner启动Netty服务等,还介绍了通过Redis实现消息发布订阅的机制,需要的朋友可以参考下

引言

在分布式系统中,Spring Cloud 为微服务架构提供了丰富的功能,而 Netty 是一个高性能的网络通信框架。将二者结合实现 Socket 集群,可以满足大规模、高性能网络通信的需求,实现前后端间高效稳定的通信。

1. 服务注册和发现中心

这里服务注册和发行中心使用nacos为例(需要启动一个nacos服务器)。

微服务注册: 在每一个微服务项目中,添加Nacos客户端连接,并在配置文件中指定服务名称和端口。例如:

# Tomcat
server:
  port: 9201
  netty:
    port: 10201
    application:
      name: yhy-netty-server

# Spring
spring:
  application:
    # 应用名称
    name: soc-dmoasp-system
  profiles:
    # 环境配置
    active: dev
  cloud:
    nacos:
      discovery:
        # 服务注册地址
        server-addr: nacos-registry:8858
      config:
        # 配置中心地址
        server-addr: nacos-registry:8858
        file-extension: yml
        # 共享配置
        shared-configs:
          - data-id: application.${spring.cloud.nacos.config.file-extension}
            refresh: true
          - data-id: soc-dmoasp-redission.${spring.cloud.nacos.config.file-extension}
          - data-id: soc-dmoasp-druid.${spring.cloud.nacos.config.file-extension}

这是一个基本的服务配置。里面关于netty的applicaiton.name和port可以通过Nacos的NamingService类手动注册。

1.1. Netty服务器搭建

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-reactor-netty</artifactId>
</dependency>
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.soc.dmoasp.system.server.handler.WebSocketIdleStateHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import java.net.InetAddress;
import java.util.Properties;

/**
 * Netty服务
 * @author dsw
 * @date 2024/11/18 17:34
 */
@Component
public class NettyServer implements CommandLineRunner {

    Logger log = LoggerFactory.getLogger(NettyServer.class);

    @Autowired
    private NacosDiscoveryProperties nacosDiscoveryProperties;


    @Value("${server.netty.port}")
    private Integer nettyPort;
    @Value("${server.netty.application.name}")
    private String nettyName;

    EventLoopGroup bossGroup;

    EventLoopGroup workGroup;

    @Override
    public void run(String... args) throws Exception {
        log.info("初始化netty配置开始");
        //netty 服务端启动的端口不可和Springboot启动类的端口号重复
        this.start();
        //关闭服务器的时候同时关闭Netty服务
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                this.destroy();
            } catch (InterruptedException e) {
                log.error(e.getMessage());
            }
        }));
    }

    @Async
    public void start() throws InterruptedException {
        try {
            bossGroup = new NioEventLoopGroup(1);
            workGroup = new NioEventLoopGroup(10);
            ServerBootstrap bootstrap = new ServerBootstrap();
            // bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
            bootstrap.group(bossGroup, workGroup)
            // 指定Channel
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline
                                    //HTTP解码
                                    .addLast(new HttpServerCodec())
                                    .addLast(new ChunkedWriteHandler())
                                    //HTTP段聚合
                                    .addLast(new HttpObjectAggregator(1024*1024))
                                    //将HTTP协议转成ws协议
                                    .addLast(new WebSocketServerProtocolHandler("/socket"))

                            ;

                        }
                    });
            registerNamingService(nettyName,nettyPort);
            // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
            ChannelFuture future = bootstrap.bind(nettyPort).sync();
            if (future.isSuccess()) {
                log.info("Server started and listen on:{}", future.channel().localAddress());
                log.info("启动 Netty Server");
            }
        } catch (InterruptedException e) {
            log.error("netty异常:{}", e.getMessage());
        }
    }
    /**
     * 将Netty服务注册进Nacos
     *
     * @param nettyName 服务名称
     * @param nettyPort 服务端口号
     */
    private void registerNamingService(String nettyName, Integer nettyPort) {
        try {
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosDiscoveryProperties.getServerAddr());
            properties.setProperty(PropertyKeyConst.NAMESPACE, nacosDiscoveryProperties.getNamespace());
            properties.setProperty(PropertyKeyConst.USERNAME, nacosDiscoveryProperties.getUsername());
            properties.setProperty(PropertyKeyConst.PASSWORD, nacosDiscoveryProperties.getPassword());
            NamingService namingService = NamingFactory.createNamingService(properties);
            InetAddress address = InetAddress.getLocalHost();
            // 定义服务实例信息
            Instance instance = new Instance();
            instance.setIp(address.getHostAddress());
            instance.setPort(nettyPort);
            instance.setWeight(1.0);
            instance.setHealthy(true);
            namingService.registerInstance(nettyName, nacosDiscoveryProperties.getGroup(), instance);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 释放资源
     */
    @PreDestroy
    public void destroy() throws InterruptedException {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully().sync();
        }
        if (workGroup != null) {
            workGroup.shutdownGracefully().sync();
        }
        log.info("关闭Netty");

    }

}

使用CommandLineRunner接口实现run方法在启动项目的时候把Netty服务带起。

bossGroup workGroup 的角色区别

registerNamingService方法

这时候可以看到我们Nacos配置中配置了server.netty.portserver.netty.application.name这两个参数分别对应netty的端口和netty的微服务应用名。

registerNamingService方法用于往Nacos中注册服务,这里通过NamingService类的registerInstance方法将netty服务注册进Nacos中。

1.2. Gateway网关转发

微服务中所有的请求都是由网关转发,这里使用Gateway转发。

# spring配置
  spring:
    cloud:
      gateway:
        discovery:
          locator:
            lowerCaseServiceId: true
            enabled: true
        routes:
          # 系统模块
          - id: soc-dmoasp-system
            uri: lb://soc-dmoasp-system
            predicates:
              - Path=/system/**
            filters:
              - StripPrefix=1
          # netty服务
          - id:  netty-server
            uri: lb:ws://soc-netty-server
            predicates:
              - Path=/netty-server/**
            filters:
              - StripPrefix=1
#不需要进行权限校验的uri
security:
  ignore:
    whites:
      - /auth/logout
      - /auth/login
      - /auth/register
      - /*/v2/api-docs
      - /csrf
      #netty连接地址
      - /netty-server/**

配置文件中添加Netty路由,在鉴权网关中需要将socket地址放行,不进行权限验证。例如:

@Component
@RefreshScope
public class AuthFilter implements GlobalFilter, Ordered
{
    private static final Logger log = LoggerFactory.getLogger(AuthFilter.class);

    // 排除过滤的 uri 地址,nacos自行添加
    @Autowired
    private IgnoreWhiteProperties ignoreWhite;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain)
    {
        ServerHttpRequest request = exchange.getRequest();
        ServerHttpRequest.Builder mutate = request.mutate();

        String url = request.getURI().getPath();
        // 跳过不需要验证的路径
        if (StringUtils.matches(url, ignoreWhite.getWhites()))
        {
            return chain.filter(exchange);
        }
        ......
    }
}

启动Gateway和System模块

启动完成后System模块会打印NettyServer输出的启动日志,Nacos中也会有手动注册的Netty服务。

通过ws://127.0.0.1:8080/netty-server/socket就可以直接连接上Netty服务器(8080为Gateway的端口)。

2. 鉴权、心跳、客户端与服务端之间的通信

2.1. 鉴权

创建AuthHandler类,继承SimpleChannelInboundHandler类重写channelRead0方法,channelRead0中可以监听到客户端往服务端发送的消息。 例如:

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.soc.dmoasp.common.core.constant.CacheConstants;
import com.soc.dmoasp.common.core.constant.TokenConstants;
import com.soc.dmoasp.common.core.enums.NettyMsgEnum;
import com.soc.dmoasp.common.core.utils.JwtUtils;
import com.soc.dmoasp.common.core.utils.StringUtils;
import com.soc.dmoasp.common.redis.service.RedisService;
import com.soc.dmoasp.system.server.vo.NettyResult;
import io.jsonwebtoken.Claims;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;

/**
 * netty鉴权处理
 * @author dsw
 * @date 2024/11/18 17:55
 */
public class AuthHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    Logger log = LoggerFactory.getLogger(AuthHandler.class);

    private final RedisTemplate<String, Object> redisTemplate;

    private final RedisService redisService;

    public AuthHandler(RedisTemplate<String, Object> stringObjectRedisTemplate, RedisService redisService) {
        redisTemplate = stringObjectRedisTemplate;
        this.redisService = redisService;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
        try {
            JSONObject clientMessage = JSON.parseObject(textWebSocketFrame.text());
            //获取code 只判断code为鉴权的消息类型
            Integer code = clientMessage.getInteger("code");
            if (NettyMsgEnum.AUTH_MESSAGE.getCode().equals(code)) {
                //获取token
                String token = clientMessage.getString("token");
                if (StringUtils.isEmpty(token))
                {
                    ctx.channel().writeAndFlush(NettyResult.authFail("令牌不能为空"));
                    ctx.close();
                }
                // 如果前端设置了令牌前缀,则裁剪掉前缀
                if (StringUtils.isNotEmpty(token) && token.startsWith(TokenConstants.PREFIX))
                {
                    token = token.replaceFirst(TokenConstants.PREFIX, StringUtils.EMPTY);
                }
                //JWT校验
                Claims claims = JwtUtils.parseToken(token);
                if (claims == null)
                {
                    ctx.channel().writeAndFlush(NettyResult.authFail("令牌已过期或验证不正确"));
                    ctx.close();
                }
                String userkey = JwtUtils.getUserKey(claims);
                //从Redis中查看是否有这个Token没有则不能登录
                boolean islogin = redisService.hasKey(getTokenKey(userkey));
                if (!islogin)
                {
                    ctx.channel().writeAndFlush(NettyResult.authFail("登录状态已过期"));
                    ctx.close();
                }
                //获取用户保存至Socket连接会话中
                String userId = JwtUtils.getUserId(claims);
                AttributeKey<String> userIdKey = AttributeKey.valueOf("userId");
                ctx.channel().attr(userIdKey).setIfAbsent(userId);
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("userId",userId);
                log.info("有新的Socket客户端链接 userId :{}", userId);
                //将连接信息保存至Redis中key为userId value为ctx.channel().id()
                redisTemplate.opsForHash().put(CacheConstants.getUserChannelKey(),userId,ctx.channel().id());
                ctx.channel().writeAndFlush(NettyResult.success(NettyMsgEnum.AUTH_MESSAGE.getCode(), "鉴权成功", jsonObject));
                //鉴权完成后移除AuthHandler消息监听
                ctx.pipeline().remove(AuthHandler.class);
            } else {
                ctx.channel().writeAndFlush(NettyResult.authFail("请先鉴权,在发送其他类型请求!"));
                ctx.close();
            }
        } catch (Exception e) {
            log.error(e.getMessage());
            ctx.channel().writeAndFlush(NettyResult.authFail("鉴权失败"));
            ctx.close();
        }
    }

    /**
     * 获取缓存key
     */
    private String getTokenKey(String token)
    {
        return CacheConstants.LOGIN_TOKEN_KEY + token;
    }
}

泛型TextWebSocketFrame表示接收文本类型的消息。

其中连接的用户信息保存到Redis中,RedisTemplate<String, Object> redisTemplate对象是用来保存Netty连接信息的,序列化使用的是String(用户信息用String存储,使用JSON序列化会反序列化失败),对应接收的JSON串如下:

{"code":1001,token:"Bearer XXXX"}

code取NettyMsgEnum中的code,token则是登录时生成的token令牌。

鉴权后将AuthHandler移除,会话后续的消息交互不在进AuthHandler。

NettyMsgEnum如下:

/**
 * netty消息类型枚举
 * @author dsw
 * @date 2024/11/18 17:58
 */
public enum NettyMsgEnum {

    AUTH_MESSAGE(1001, "鉴权消息","Auth-Netty"),
    //{'code':1003,'data':{'unreadCount':0}}
    NOTICE_MESSAGE(1003, "公告通知消息","Notice-Netty"),
    HEART_MESSAGE(1006, "心跳消息","Heart-Netty"),
    ERROR_MESSAGE(-1, "错误",null);

    private final Integer code;

    private final String info;

    private final String strategyName;

    NettyMsgEnum(Integer code, String info, String strategyName){
        this.code = code;
        this.info = info;
        this.strategyName = strategyName;
    }

    public static NettyMsgEnum getByCode(Integer code) {
        for (NettyMsgEnum msgEnum : values()) {
            if (msgEnum.getCode().equals(code)) {
                return msgEnum;
            }
        }
        return ERROR_MESSAGE;
    }

    public Integer getCode() {
        return code;
    }

    public String getInfo() {
        return info;
    }

    public String getStrategyName() {
        return strategyName;
    }
}

NettyResult如下:

import com.alibaba.fastjson2.JSON;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.io.Serializable;

/**
 * netty响应实体
 * @author dsw
 * @date 2024/11/18 18:02
 */
public class NettyResult  implements Serializable {

    private static final long serialVersionUID = 1L;

    private Integer code;

    private String message;

    private Object data;

    public NettyResult(Integer code, String message, Object data) {
        this.code = code;
        this.message = message;
        this.data = data;
    }


    public static TextWebSocketFrame fail(String message) {
        return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(-1, message, null)));
    }

    public static TextWebSocketFrame authFail(String message) {
        return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(-2, message, null)));
    }

    public static TextWebSocketFrame success( String message) {
        return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(200, message, null)));
    }

    public static TextWebSocketFrame success(Integer code, Object data) {
        return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(code,null, data)));
    }

    public static TextWebSocketFrame success(Integer code, String message, Object data) {
        return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(code,message, data)));
    }

    public Integer getCode() {
        return code;
    }

    public String getMessage() {
        return message;
    }

    public Object getData() {
        return data;
    }
}

最后到NettyServer中的ChannelInitializer加入AuthHandler:

我们重新项目后连接socket查看结果

如果不鉴权直接发送消息,服务端会主动断开连接,客户端需要重连。

这就代表已经连接成功了。

2.2. 空闲检测

创建WebSocketIdleStateHandler类继承IdleStateHandler类,重写channelIdle方法。

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
 * 空闲检测
 * @author dsw
 * @date 2024/11/18 11:47
 */
public class WebSocketIdleStateHandler extends IdleStateHandler {

    Logger log = LoggerFactory.getLogger(WebSocketIdleStateHandler.class);

    /**
     * 默认的读空闲时间
     */
    private static final int DEFAULT_READER_IDLE_TIME = 10;

    /**
     * 默认10秒读空闲断开客户端
     */
    public WebSocketIdleStateHandler() {
        super(DEFAULT_READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
    }

    /**
     * 指定心跳时间(秒)
     *
     * @param readerIdleTimeSeconds 读空闲时间
     * @param writerIdleTimeSeconds 写空闲时间
     * @param allIdleTimeSeconds    读写空闲时间
     */
    public WebSocketIdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
        super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, TimeUnit.SECONDS);
    }

    /**
     * 指定心跳时间及时间单位
     *
     * @param readerIdleTime 读空闲时间
     * @param writerIdleTime 写空闲时间
     * @param allIdleTime    读写空闲时间
     * @param unit           时间单位
     */
    public WebSocketIdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
        super(readerIdleTime, writerIdleTime, allIdleTime, unit);
    }

    /**
     * 当空闲事件触发时执行
     */
    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        //如果是读空闲
        if (evt.state().equals(IdleState.READER_IDLE)) {
            Channel channel = ctx.channel();
            log.debug("服务端未检测到客户端【{}】的心跳包,强制关闭客户端!", channel.id());
            channel.close();
        }
        super.channelIdle(ctx,evt);
    }
}

以上实现了父类的构造函数,可以指定具体的空闲时间。当空闲时会触发channelIdle方法,则服务端主动断开连接。

最后到NettyServer中的ChannelInitializer加入WebSocketIdleStateHandler:

加到最前面。

示例设置了10秒断开,需要使用中自行调整。

2.3. 消息通信

2.3.1. 接收客户端的消息

创建WebSocketHandler类,继承SimpleChannelInboundHandler类重写channelRead0、handlerAdded、handlerRemoved、exceptionCaught方法。

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.soc.dmoasp.common.core.constant.CacheConstants;
import com.soc.dmoasp.common.core.enums.NettyMsgEnum;
import com.soc.dmoasp.system.server.config.NettyConfig;
import com.soc.dmoasp.system.server.strategy.NettyStrategy;
import com.soc.dmoasp.system.server.strategy.NettyStrategyFactory;
import com.soc.dmoasp.system.server.vo.NettyResult;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;


/**
 * webSocket处理
 * @author dsw
 * @date 2024/11/18 10:20
 */
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    Logger log = LoggerFactory.getLogger(WebSocketHandler.class);

    private final RedisTemplate<String, Object> redisTemplate;
    
    private final NettyStrategyFactory nettyStrategyFactory;

    public WebSocketHandler(RedisTemplate<String, Object> redisTemplate, NettyStrategyFactory nettyStrategyFactory) {
        this.redisTemplate = redisTemplate;
        this.nettyStrategyFactory = nettyStrategyFactory;
    }

    /**
    * webSocket连接创建后调用
    */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // 添加到channelGroup 通道组
        NettyConfig.getChannelGroup().add(ctx.channel());
    }
    /**
    * 读取数据
    */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
        AttributeKey<String> userIdKey = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(userIdKey).get();
        log.info("收到消息 userId:{} message:{}",userId,frame.text());
        // 接收客户端的消息
        JSONObject pullMessage = JSON.parseObject(frame.text());
        Integer code = pullMessage.getInteger("code");
        // 获取消息类型
        NettyStrategy nettyStrategy = nettyStrategyFactory.getNettyStrategy(NettyMsgEnum.getByCode(code));
        // 处理消息
        TextWebSocketFrame pushMessage = nettyStrategy.execute(pullMessage);
        // 返回处理结果给客户端
        ctx.channel().writeAndFlush(pushMessage);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        AttributeKey<String> userIdKey = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(userIdKey).get();
        log.info("用户下线了 userId:{}",userId);
        // 删除通道
        removeUserId(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.channel().writeAndFlush(NettyResult.fail("系统错误:" + cause.getMessage()));
    }

    /**
    * 删除用户与channel的对应关系
    */
    private void removeUserId(ChannelHandlerContext ctx) {
        AttributeKey<String> userIdKey = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(userIdKey).get();
      if(StringUtils.isNotBlank(userId)){
         redisTemplate.opsForHash().delete(CacheConstants.getUserChannelKey(),userId);
      }
   }
}

这里收到消息后通过一个策略模式进入不同的策略,通过NettyMsgEnum里面定义的code指定不同的策略类。

Strategy和StrategyFactory:

import com.alibaba.fastjson2.JSONObject;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

/**
 * Netty接收消息处理策略类
 * @author dsw
 * @date 2024/5/27 10:21
 */
public interface NettyStrategy {

    /**
     * 执行添加数值
     *
     * @return
     */
    TextWebSocketFrame execute(JSONObject message);
}
import com.soc.dmoasp.common.core.enums.NettyMsgEnum;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * Netty策略工厂
 * @author dsw
 * @date 2024/11/18 10:20
 */
@Component
public class NettyStrategyFactory {

    Logger log = org.slf4j.LoggerFactory.getLogger(NettyStrategyFactory.class);

    /**
     * 通过Spring容器的方式注入
     */
    @Autowired
    private Map<String, NettyStrategy> nettyStrategy;

    /**
     * 获取对应策略类
     * @param
     */
    public NettyStrategy getNettyStrategy(NettyMsgEnum nettyMsgEnum){
        if(!nettyStrategy.containsKey(nettyMsgEnum.getStrategyName())){
            log.warn("没有对应的消息策略");
            throw new RuntimeException("没有对应的消息策略");
        }
        return nettyStrategy.get(nettyMsgEnum.getStrategyName());
    }
}

我们实现一个心跳消息的策略:

import com.alibaba.fastjson2.JSONObject;
import com.soc.dmoasp.common.core.enums.NettyMsgEnum;
import com.soc.dmoasp.system.server.strategy.NettyStrategy;
import com.soc.dmoasp.system.server.vo.NettyResult;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.stereotype.Component;

/**
 * Netty心跳消息
 * @author dsw
 * @date 2024/5/27 10:25
 */
@Component("Heart-Netty")
public class HeartStrategyImpl implements NettyStrategy {

    @Override
    public TextWebSocketFrame execute(JSONObject message) {
        String data = message.getString("data");
        if ("ping".equals(data)) {
            return NettyResult.success(NettyMsgEnum.HEART_MESSAGE.getCode(), null, "pong");
        }
        return NettyResult.fail("消息格式不正确");
    }
}

添加至ChannelInitializer后重启查看效果。

2.3.2. 发送消息给客户端

集群下面的Netty配置

方案1:使用Redis的发布订阅

方案2:使用MQ的发布订阅

我们这里使用使用Redis的发布订阅实现。

添加Redis订阅器:

import com.soc.dmoasp.common.core.constant.CacheConstants;
import com.soc.dmoasp.common.redis.configure.FastJson2JsonRedisSerializer;
import com.soc.dmoasp.system.server.receiver.PushMsgRedisReceiver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

import java.util.Collections;
import java.util.List;

/**
 * redisReceiver 配置
 * @author dsw
 * @date 2024/11/18 12:07
 */
@Configuration
public class RedisReceiverConfig {

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,
                                                                       MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        List<PatternTopic> topics = Collections.singletonList(
            PatternTopic.of(CacheConstants.Topic.SYS_SOCKET_PUSH_TOPIC)
        );
        // 添加订阅者监听类,数量不限.PatternTopic定义监听主题,这里监听test-topic主题
        container.addMessageListener(listenerAdapter, topics);
        return container;
    }

    @Bean
    @SuppressWarnings(value = { "unchecked", "rawtypes" })
    public MessageListenerAdapter listenerAdapter(PushMsgRedisReceiver pushMsgRedisReceiver) {
        MessageListenerAdapter adapter = new MessageListenerAdapter(pushMsgRedisReceiver);
        FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class);
        adapter.setSerializer(serializer);
        return adapter;
    }
}
import com.soc.dmoasp.common.core.constant.CacheConstants;
import com.soc.dmoasp.common.core.exception.Asserts;
import com.soc.dmoasp.common.redis.dto.NettyMessage;
import com.soc.dmoasp.system.server.config.NettyConfig;
import com.soc.dmoasp.system.server.vo.NettyResult;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Objects;

/**
 * Netty发送消息接收者
 * @author dsw
 * @date 2024/11/18 12:07
 */
@Component
public class PushMsgRedisReceiver {
    Logger log = LoggerFactory.getLogger(PushMsgRedisReceiver.class);

    @Autowired
    private RedisTemplate<String,Object> stringObjectRedisTemplate;

    /**
     * RedisTopic订阅
     * @param nettyMessage netty消息
     * @param topic netty消息对应的topic
     */
    public void handleMessage(NettyMessage nettyMessage, String topic) {

        Object channelId = stringObjectRedisTemplate.opsForHash().get(CacheConstants.getUserChannelKey(), nettyMessage.getUserId());
        if (Objects.isNull(channelId)) {
            log.warn("推送消息失败,用户不在线! userId:{},msg:{}",nettyMessage.getUserId(),nettyMessage.getMessage());
            Asserts.fail("推送消息失败,用户不在线!");
        }
        Channel channel = NettyConfig.getChannelGroup().find((ChannelId) channelId);
        if(channel!=null){
            channel.writeAndFlush(NettyResult.success(nettyMessage.getCode(),nettyMessage.getMessage()));
            log.info("推送消息成功! userId:{},msg:{}",nettyMessage.getUserId(),nettyMessage.getMessage());
        }else {
            log.warn("推送消息失败,没有找到Channel! userId:{},msg:{}",nettyMessage.getUserId(),nettyMessage.getMessage());
        }
    }
}

发布订阅的机制就是所有集群都会收到消息,收到消息后每个netty集群都去找对应的消息会话通道,如果没找到则说明连接不到当前服务上,找到通道后则可以直接推送。 这里使用stringObjectRedisTemplate获取用户通道,避免序列化失败。

RedisService中实现发布消息

/**
 * Redis消息发布订阅 发布消息
 * @param channel 通道ID
 * @param message 消息
 */
@Async
public void convertAndSend(String channel, Object message) {
    redisTemplate.convertAndSend(channel, message);
}

发布消息的工具类:

import com.alibaba.fastjson2.JSONObject;
import com.soc.dmoasp.common.core.constant.CacheConstants;
import com.soc.dmoasp.common.redis.dto.NettyMessage;
import com.soc.dmoasp.common.redis.dto.PushSocketMsgDTO;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Set;

/**
 * Netty发送消息给服务端
 * @author dsw
 * @date 2024/11/18 11:47
 */
@Service
public class PushSocketMsgService {

    Logger log = org.slf4j.LoggerFactory.getLogger(PushSocketMsgService.class);

    @Autowired
    private RedisTemplate<String, Object> stringObjectRedisTemplate;

    @Autowired
    private RedisService redisService;

    /**
     * 给所有用户发送消息
     * @param msgDTO
     */
    public void pushMsgToAll( PushSocketMsgDTO msgDTO) {
        Set<Object> keys = stringObjectRedisTemplate.opsForHash().keys(CacheConstants.getUserChannelKey());
        keys.forEach(key -> this.pushMsgToUser(msgDTO.getCode(), key.toString(), msgDTO.getMessage()));
    }

    /**
     * 给指定用户发送消息
     * @param msgDTO
     */
    public void pushMsgToUserList(PushSocketMsgDTO msgDTO) {
        for(Long userId : msgDTO.getUserIdList()){
            this.pushMsgToUser(msgDTO.getCode(), userId.toString(), msgDTO.getMessage());
        }
    }

    protected void pushMsgToUser(Integer code, String userId, JSONObject message) {
        //推送到其他负载处理
        NettyMessage nettyMessage = new NettyMessage();
        nettyMessage.setUserId(userId);
        nettyMessage.setCode(code);
        nettyMessage.setMessage(message);
        redisService.convertAndSend(CacheConstants.Topic.SYS_SOCKET_PUSH_TOPIC, nettyMessage);
        log.info("推送消息成功! userId:{},message:{}", userId, message);
    }
}

NettyMessage和PushSocketMsgDTO:

import com.alibaba.fastjson2.JSONObject;
import lombok.Data;

/**
 * Neety消息发布VO
 * @author dsw
 * @date 2024/11/18 13:49
 */
@Data
public class NettyMessage {

    private Integer code;

    private String userId;

    private JSONObject message;

}
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;

/**
 * 发送socket消息DTO
 * @author dsw
 * @date 2024/11/18 13:39
 */
@Data
public class PushSocketMsgDTO {
    /**
     * 消息类型
     * 详情看 NettyMsgEnum
     */
    private Integer code;
    /**
     * 用户ID
     */
    private List<Long> userIdList;
    /**
     * 消息体
     */
    private JSONObject message;
}

测试结果:

以上就是SpringCloud整合Netty集群实现WebSocket的示例代码的详细内容,更多关于SpringCloud Netty实现WebSocket的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文