springboot之springboot与netty整合方案
作者:RachelHwang
1、简单概述
Netty是一个高性能、异步事件驱动的NIO框架,基于JAVA NIO提供的API实现。
它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。
作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,一些业界著名的开源组件也基于Netty的NIO框架构建。
Netty 对 JDK 自带的 NIO 的 API 进行封装,解决上述问题,主要特点有:
- 设计优雅,适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型,单线程,一个或多个线程池;真正的无连接数据报套接字支持(自 3.1 起)。
- 使用方便,详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。
- 高性能,吞吐量更高,延迟更低;减少资源消耗;最小化不必要的内存复制。
- 安全,完整的 SSL/TLS 和 StartTLS 支持。
- 社区活跃,不断更新,社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入。
2、IO模型
Netty 作为异步事件驱动的网络,高性能之处主要来自于其 I/O 模型和线程处理模型,前者决定如何收发数据,后者决定如何处理数据。
I/O 复用模型:
在 I/O 复用模型中,会用到 Select,这个函数也会使进程阻塞,但是和传统的JAVA阻塞 I/O 所不同的是这两个函数可以同时阻塞多个 I/O 操作。
而且可以同时对多个读操作,多个写操作的 I/O 函数进行检测,直到有数据可读或可写时,才真正调用 I/O 操作函数。
Netty 的非阻塞 I/O 的实现关键是基于 I/O 复用模型,这里用 Selector 对象表示:
Netty 的 IO 线程 NioEventLoop 由于聚合了多路复用器 Selector,可以同时并发处理成百上千个客户端连接。
当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。
线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。
由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。
一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。
3、事件模型
下图描述了Netty进行事件处理的流程。
Channel是连接的通道,是ChannelEvent的产生者,而ChannelPipeline可以理解为ChannelHandler的集合。
在Netty里,所有事件都来自ChannelEvent接口,这些事件涵盖监听端口、建立连接、读写数据等网络通讯的各个阶段。
而事件的处理者就是ChannelHandler,这样,不但是业务逻辑,连网络通讯流程中底层的处理,都可以通过实现ChannelHandler来完成了。
事实上,Netty内部的连接处理、协议编解码、超时等机制,都是通过handler完成的。
3、springboot与netty集成
3.1 添加netty+springboot依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.28.Final</version> </dependency>
3.2 netty服务端配置
/** * 服务端基本配置,通过一个静态单例类,保证启动时候只被加载一次 */ @Component public class WsServer { /** * 单例静态内部类 */ public static class SingletionWServer{ static final WsServer instance = new WsServer(); } public static WsServer getInstance(){ return SingletionWServer.instance; } private EventLoopGroup mainGroup ; private EventLoopGroup subGroup; private ServerBootstrap server; private ChannelFuture future; public WsServer(){ mainGroup = new NioEventLoopGroup(); subGroup = new NioEventLoopGroup(); server = new ServerBootstrap(); server.group(mainGroup, subGroup) .channel(NioServerSocketChannel.class) .childHandler(new WsServerInitialzer());//添加自定义初始化处理器 } public void start(){ future = this.server.bind(8081); System.err.println("netty 服务端启动完毕 ....."); } }
自定义初始化处理器: WsServerInitialzer
public class WsServerInitialzer extends ChannelInitializer<SocketChannel>{ @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //websocket基于http协议,所以需要http编解码器 pipeline.addLast(new HttpServerCodec()); //添加对于读写大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); //对httpMessage进行聚合 pipeline.addLast(new HttpObjectAggregator(1024*64)); // ================= 上述是用于支持http协议的 ============== //websocket 服务器处理的协议,用于给指定的客户端进行连接访问的路由地址 //比如处理一些握手动作(ping,pong) pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); //自定义handler pipeline.addLast(new ChatHandler()); } }
自定义handler类: ChatHandler
/** * 上文中需要自定义处理的handler * TextWebSocketFrame 用于为websockt处理文本的对象 */ public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ //用于记录和管理所有客户端的channel private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //客户端传递过来的消息 String content = msg.text(); System.out.println("接收到了客户端的消息是:" + content); //将客户端发送过来的消息刷到所有的channel中 for(Channel channel : clients){ channel.writeAndFlush( new TextWebSocketFrame("[服务器接收到了客户端的消息:]" + LocalDateTime.now()+",消息为:" + content)); } } //客户端创建的时候触发,当客户端连接上服务端之后,就可以获取该channel,然后放到channelGroup中进行统一管理 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { clients.add(ctx.channel()); } //客户端销毁的时候触发, @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { //当handlerRemoved 被触发时候,channelGroup会自动移除对应的channel //clients.remove(ctx.channel()); System.out.println("客户端断开,当前被移除的channel的短ID是:" +ctx.channel().id().asShortText()); } }
配置springboot启动加载netty服务
/** * netty服务端启动加载配置 */ @Component public class NettybootServerInitConfig implements ApplicationListener<ContextRefreshedEvent>{ @Override public void onApplicationEvent(ContextRefreshedEvent event) { if(event.getApplicationContext().getParent() == null){ WsServer.getInstance().start(); } } }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。