java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > springboot netty心跳检测和自动重连

springboot整合netty实现心跳检测和自动重连

作者:如夜了我衣衫太薄便归家靠路灯°

本文主要介绍了Spring Boot中整合Netty实现心跳检测和自动重连,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

1. 引入依赖

在 pom.xml 中添加 Netty 和 Spring Boot 相关依赖。

<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    
    <!-- Netty Dependency -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.63.Final</version>
    </dependency>
    
    <!-- 其他相关依赖 -->
</dependencies>

2. 配置 Netty 服务端

创建一个 Netty 服务器启动类,配置心跳检测机制。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Component
public class NettyServer implements CommandLineRunner {

    private final int port = 8080;

    @Override
    public void run(String... args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new IdleStateHandler(5, 7, 10, TimeUnit.SECONDS));
                     ch.pipeline().addLast(new HeartbeatHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)
             .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

3. 实现心跳检测处理器

创建一个 HeartbeatHandler 类处理心跳检测。

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleState;

public class HeartbeatHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                System.out.println("读空闲");
                // 关闭连接
                ctx.close();
            } else if (event.state() == IdleState.WRITER_IDLE) {
                System.out.println("写空闲");
            } else if (event.state() == IdleState.ALL_IDLE) {
                System.out.println("读写空闲");
                // 发送心跳包
                ctx.writeAndFlush("ping\n");
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

4. 配置 Netty 客户端

创建一个 Netty 客户端启动类,实现自动重连和心跳检测。

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Component
public class NettyClient {

    private final String host = "localhost";
    private final int port = 8080;
    private final int MAX_RETRY = 5;
    private int retry = 0;

    public void start() {
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.SO_KEEPALIVE, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));
                     ch.pipeline().addLast(new ClientHeartbeatHandler());
                 }
             });

            connect(b);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void connect(Bootstrap b) {
        b.connect(host, port).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    System.out.println("连接服务器成功");
                } else {
                    System.out.println("连接服务器失败,尝试重连");
                    retry++;
                    if (retry < MAX_RETRY) {
                        future.channel().eventLoop().schedule(() -> connect(b), 2 << retry, TimeUnit.SECONDS);
                    } else {
                        System.out.println("重连失败次数达到最大,放弃连接");
                    }
                }
            }
        });
    }
}

5. 实现客户端心跳处理器

创建一个 ClientHeartbeatHandler 类处理心跳包。

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleState;

public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.WRITER_IDLE) {
                System.out.println("发送心跳包");
                ctx.writeAndFlush("ping\n");
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("连接断开,尝试重连");
        // 在这里实现重连逻辑
        // 比如: ctx.channel().eventLoop().schedule(() -> connect(), 5, TimeUnit.SECONDS);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

6. 启动 Spring Boot 应用

在 Spring Boot 的主类中启动 Netty 服务器和客户端。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.annotation.PostConstruct;

@SpringBootApplication
public class NettySpringBootApplication {

    @Autowired
    private NettyServer nettyServer;

    @Autowired
    private NettyClient nettyClient;

    public static void main(String[] args) {
        SpringApplication.run(NettySpringBootApplication.class, args);
    }

    @PostConstruct
    public void startNetty() {
        new Thread(() -> {
            try {
                nettyServer.run();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> nettyClient.start()).start();
    }
}

关键点总结

到此这篇关于springboot整合netty实现心跳检测和自动重连的文章就介绍到这了,更多相关springboot 心跳检测和自动重连内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家! 

您可能感兴趣的文章:
阅读全文