Netty中的心跳检测机制详解
作者:warybee
Netty心跳检测机制
1 心跳检测使用场景
长连接的应用场景非常的广泛,比如监控系统,IM系统,即时报价系统,推送服务等等。像这些场景都是比较注重实时性,如果每次发送数据都要进行一次DNS解析,建立连接的过程肯定是极其影响体验。
而长连接的维护必然需要一套机制来控制。比如 HTTP/1.0 通过在 header 头中添加 Connection:Keep-Alive参数,如果当前请求需要保活则添加该参数作为标识,否则服务端就不会保持该连接的状态,发送完数据之后就关闭连接。HTTP/1.1以后 Keep-Alive 是默认打开的。
Netty 是 基于 TCP 协议开发的,在四层协议 TCP 协议的实现中也提供了 keepalive 报文用来探测对端是否可用。TCP 层将在定时时间到后发送相应的 KeepAlive 探针以确定连接可用性。
Netty 中提供了 tcp-keepalive 的设置:
.childOption(ChannelOption.SO_KEEPALIVE,true) 表示打开 TCP 的 keepAlive 设置。
2 Netty心跳检测机制
Netty 中提供了 IdleStateHandler 类专门用于处理心跳。构造函数如下:
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit) { this(false, readerIdleTime, writerIdleTime, allIdleTime, unit); }
参数说明:
- readerIdleTime 隔多久检查一下读事件是否发生,如果 channelRead() 方法超过 readerIdleTime 时间未被调用则会触发超时事件调用 userEventTrigger() 方法
- writerIdleTime 隔多久检查一下写事件是否发生,如果 write() 方法超过 writerIdleTime 时间未被调用则会触发超时事件调用 userEventTrigger() 方法;
- allIdleTime 隔多久检查读写事件是否发生
- unit 时间单位
可以分别控制读,写,读写超时的时间,如果设置为0表示不检测,所以如果全是0,则相当于没添加这个 IdleStateHandler,连接是个普通的短连接。
2.1 代码演示
服务端
public class TestHeartServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap=new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,128) .childOption(ChannelOption.SO_KEEPALIVE,true) .handler(new LoggingHandler(LogLevel.INFO))//bossGroup处理handler .childHandler(new ChannelInitializer<SocketChannel>() {//workergroup处理handler @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //每隔5s检查一下是否有读事件发生 pipeline.addLast(new IdleStateHandler(5,0,0, TimeUnit.SECONDS)); pipeline.addLast(new TestHeartServerHandler()); } }); ChannelFuture channelFuture = bootstrap.bind(9999).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
服务端handler
public class TestHeartServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf=(ByteBuf) msg; System.out.println("客户端消息:"+buf.toString(StandardCharsets.UTF_8)); //向客户端发送消息 //ctx.writeAndFlush(Unpooled.copiedBuffer("heart",StandardCharsets.UTF_8)); } /** *如果5s没有读请求,则向客户端发送心跳 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent) evt; switch (event.state()) { case READER_IDLE: //读空闲 //如果5s没有读请求,则向客户端发送心跳 ctx.writeAndFlush("server send Heartbeat").addListener(ChannelFutureListener.CLOSE_ON_FAILURE); break; case WRITER_IDLE://写空闲 break; case ALL_IDLE://读写空闲 break; } } }
客户端
public class TestHeartClient { public static void main(String[] args) { EventLoopGroup eventExecutors=new NioEventLoopGroup(); try { Bootstrap bootstrap=new Bootstrap(); bootstrap.group(eventExecutors) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //每隔4s检查一下是否有写事件 pipeline.addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS)); pipeline.addLast(new TestHeartClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync(); //向服务端发送消息 channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer("Hello server, i'm online", StandardCharsets.UTF_8)); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { eventExecutors.shutdownGracefully(); } } }
客户端Handler
public class TestHeartClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf=(ByteBuf) msg; System.out.println("服务端发送的消息:"+buf.toString(StandardCharsets.UTF_8)); } /** * * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent) evt; String eventType = null; switch (event.state()) { //读空闲 case READER_IDLE: break; case WRITER_IDLE://写空闲 //如果4s没有收到写请求,则向服务端发送心跳请求 ctx.writeAndFlush(Unpooled.copiedBuffer("client send Heartbeat",StandardCharsets.UTF_8)).addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ; break; case ALL_IDLE://读写空闲 break; } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
解释一下代码的逻辑:
服务端添加了:
Copypipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
每隔5s检查一下是否有读事件发生,如果没有就触发 handler 中的 userEventTriggered(ChannelHandlerContext ctx, Object evt)逻辑。
客户端添加了:
Copynew IdleStateHandler(0, 4, 0, TimeUnit.SECONDS)
每隔4s检查一下是否有写事件,如果没有就触发 handler 中的 userEventTriggered(ChannelHandlerContext ctx, Object evt)逻辑。
到此这篇关于Netty中的心跳检测机制详解的文章就介绍到这了,更多相关Netty心跳检测机制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!