SpringBoot如何集成Netty
作者:Sea-Man
这篇文章主要介绍了SpringBoot如何集成Netty问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
一、pom依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.77.Final</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.5.8</version> </dependency>
二、配置yml文件
server: port: 8001 servlet: context-path: /netty netty: url: 0.0.0.0 #0.0.0.0表示绑定任意ip port: 20004
三、服务端
package com.tlxy.lhn.controller.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class NettyServer { public static void main(String[] args) throws InterruptedException { //创建两个线程组bossGroup和workerGroup,含有的子线程NioEventLoop的个数默认是CPU的两倍 //bossGroup只是处理连接请求,真正的和客户端业务处理,会交给workerGroup完成 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(1); try { //创建服务器端的启动对象 ServerBootstrap bootstrap = new ServerBootstrap(); //使用链式编程来配置参数 bootstrap.group(bossGroup, workerGroup)//设置两个线程组 .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器的通道实现 //初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接 //多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理 .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { //对workerGroup的SocketChannel设置处理器 channel.pipeline().addLast(new NettyServerHandler()); } }); System.out.println("netty server start.."); //绑定一个端口并且同步生成一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况 //启动服务器(并绑定的端口),bind是异步操作,sync方法是等待异步操作执行完毕 ChannelFuture cf = bootstrap.bind(9000).sync(); //给cf注册监听器,监听我们关心的事件 cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (cf.isSuccess()) { System.out.println("监听端口9000成功"); } else { System.out.println("监听端口9000失败"); } } }); //等待服务端监听端口关闭,closeFuture是异步操作 //通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法 cf.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
NettyServer类中的
channel.pipeline().addLast(new NettyServerHandler());
对应以下的处理器。
package com.tlxy.lhn.controller.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; @Slf4j public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8)); // 读取byteBuf // 业务处理 // 回消息给客户端 } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8)); ctx.writeAndFlush(buf); } //只要Netty抛出错误就会执行,Netty断会开连接会抛出连接超时的错误 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.info("关闭通道"); cause.printStackTrace(); ctx.close(); } }
四、客户端
package com.tlxy.lhn.controller.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyClient { public static void main(String[] args) throws InterruptedException { //客户端需要一个事件循环组 NioEventLoopGroup group = new NioEventLoopGroup(); try { //创建客户端启动对象 //注意客户端使用的不是SocketBootstrap而是Bootstrap Bootstrap bootstrap = new Bootstrap(); // 设置相关参数 bootstrap.group(group) //设置线程组 .channel(NioSocketChannel.class)// 使用NioSocketChannel作为客户端的通道实现 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyClientHandler()); } }); System.out.println("netty client start.."); ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync(); cf.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } } }
NettyClient类中
ch.pipeline().addLast(new NettyClientHandler());
为处理器。
package com.tlxy.lhn.controller.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; @Slf4j public class NettyClientHandler extends ChannelInboundHandlerAdapter { /** * 客户端连接标识 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8)); ctx.writeAndFlush(buf); } //当通道建立后有事件时会触发,即服务端发送数据给客户端 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("收到服务端的消息是:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("服务端地址是:" + ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.info("关闭通道"); cause.printStackTrace(); ctx.close(); } }
五、粘包和拆包问题
客户端和服务端都是固定的框架,我们只需写处理器。
粘包和拆包问题,可以自己手写通过固定长度发送数据,或者使用Google的Protostuff。
<dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-api</artifactId> <version>1.0.8</version> </dependency> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-core</artifactId> <version>1.0.8</version> </dependency> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-runtime</artifactId> <version>1.0.8</version> </dependency>
package com.tlxy.lhn.controller.netty; import com.dyuproject.protostuff.LinkedBuffer; import com.dyuproject.protostuff.ProtostuffIOUtil; import com.dyuproject.protostuff.Schema; import com.dyuproject.protostuff.runtime.RuntimeSchema; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class ProtostuffUtil { private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>(); private static <T> Schema<T> getSchema(Class<T> clazz) { @SuppressWarnings("unchecked") Schema<T> schema = (Schema<T>) cachedSchema.get(clazz); if (schema == null) { schema = RuntimeSchema.getSchema(clazz); if (schema != null) { cachedSchema.put(clazz, schema); } } return schema; } /** * 序列化 * * @param obj * @return */ public static <T> byte[] serializer(T obj) { @SuppressWarnings("unchecked") Class<T> clazz = (Class<T>) obj.getClass(); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try { Schema<T> schema = getSchema(clazz); return ProtostuffIOUtil.toByteArray(obj, schema, buffer); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally { buffer.clear(); } } /** * 反序列化 * * @param data * @param clazz * @return */ public static <T> T deserializer(byte[] data, Class<T> clazz) { try { T obj = clazz.newInstance(); Schema<T> schema = getSchema(clazz); ProtostuffIOUtil.mergeFrom(data, obj, schema); return obj; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。