java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Netty开发粘包解决

Netty开发及粘包实战解决分析

作者:KerryWu

这篇文章主要为大家介绍了Netty开发及粘包实战解决分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

1. Netty介绍

Netty是一款开源的Java网络编程框架,广泛应用于很多高流量的服务器端应用程序:

Netty 目前最新版本是 4.1.95Final

很久之前 Netty就发布了 5 的测试版本,市场上都有很多介绍 Netty5 的书在卖了,但可惜问题太多,最终废弃了,目前依然只维护 4 的版本。

1.1. 组件

1.1.1. EventLoopGroup

EventLoopGroup 是一个线程池,用于管理和调度 EventLoop 对象。在 Netty 中,每个 EventLoopGroup 有一个或多个 EventLoop,用于处理连接请求和 I/O 操作,而每个EventLoop是单线程的。

所以Netty可以通过EventLoopGroup的构造调参,来实现不同的Reactor模型:

(1)既可也是单Reactor单线程模型:

EventLoopGroup group = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)

(2)也可以是 主从Reactor多线程模型:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup(n);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)

 主从分工

1.1.2. EventLoop

EventLoop 则是事件循环的核心,负责监听和处理 Channel 中的事件和 I/O 操作。在 EventLoopGroup 中,每个 EventLoop 都是独立的,可以并发地处理多个 Channel 上的事件和 I/O 操作。

1.1.3. Channel 和 ByteBuf

定义

搭配使用

在 Netty 中,Channel 和 ByteBuf 是紧密结合的,通常一次数据传输会涉及到两个 Channel 和两个 ByteBuf 对象,分别代表了发送端和接收端的数据缓冲区。以下是 Channel 和 ByteBuf 的搭配使用流程:

1.1.4. ChannelPipeline 和 Channel

定义

每个 Channel 都有一个关联的 ChannelPipeline 对象,当有事件发生时,Netty 会将事件从 Channel 中传递到 ChannelPipeline 中,然后按照顺序依次触发各个事件处理器 ChannelHandler 的逻辑。当事件处理完毕后,Netty 会将处理结果返回到 Channel 中,以便进行数据的读写等操作。

在 ChannelPipeline 中,可以添加多个事件处理器,用于处理不同类型的事件和数据。例如,可以添加一个消息解码器、一个消息编码器、一个业务逻辑处理器等。每个事件处理器都可以进行特定的逻辑处理,并将处理结果传递给下一个事件处理器。

1.2. 网络协议

Netty是一个非常强大和灵活的网络编程框架,它支持多种通信协议。以下是一些Netty支持的通信协议:

因为 Netty 支持的网络协议丰富,所以当有非Http协议网络通信的需求时,大家第一时间会想到 Netty。

2. 代码示例

2.1. 基于tcp协议

2.1.1.服务端

pom

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.63.Final</version>
        </dependency>

服务端

@Component
public class NettyServer {
    // 创建两个线程组,分别用于接收客户端连接和处理网络IO操作
    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
    @PostConstruct
    public void start() throws InterruptedException {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                // 指定使用 NioServerSocketChannel 作为通道实现
                .channel(NioServerSocketChannel.class)
                // 定义 ChannelPipeline(多个ChannelHandler组合)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new ServerHandler());
                    }
                });
        // 绑定端口,开始接收进来的连接
        ChannelFuture f = b.bind(8080).sync();
        if (f.isSuccess()) {
            System.out.println("启动Netty服务成功,端口号:" + 8080);
        }
    }
    @PreDestroy
    public void shutdown() {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

ChannelHandler 消息处理

public class ServerHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("Received message from client: " + msg);
        // 回复消息给客户端
        //ctx.writeAndFlush("Received your message: " + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

2.1.2.客户端

客户端

@DependsOn({"nettyServer"})
@Component
public class NettyClient {
    private EventLoopGroup group;
    private Channel channel;
    @PostConstruct
    public void start() throws InterruptedException {
        group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) {
                        ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                        socketChannel.pipeline().addLast(new ClientHandler());
                    }
                });
        ChannelFuture future = bootstrap.connect("127.0.0.1", 8080).sync();
        if (future.isSuccess()) {
            System.out.println("连接服务器成功");
        }
        channel = future.channel();
    }
    @PreDestroy
    public void destroy() {
        if (group != null) {
            group.shutdownGracefully();
        }
    }

ChannelHandler 消息处理

public class ClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        System.out.println("Server response:" + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

2.2. 基于Unix Socket协议

其他的不变,这里只关注客户服务端代码。

2.2.1. 代码

服务端

private final EventLoopGroup bossGroup = new KQueueEventLoopGroup();
    private final EventLoopGroup workerGroup = new KQueueEventLoopGroup();

    @PostConstruct
    public void start() throws InterruptedException {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(KQueueServerDomainSocketChannel.class)
                .childHandler(new ChannelInitializer<KQueueDomainSocketChannel>() {
                    @Override
                    public void initChannel(KQueueDomainSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new ServerHandler());
                    }
                });
        ChannelFuture f = b.bind(new DomainSocketAddress("/tmp/test.sock")).sync();
        if (f.isSuccess()) {
            System.out.println("启动Netty服务成功,文件:" + "/tmp/test.sock");
        }
    }

客户端

public void start() throws InterruptedException {
        group = new KQueueEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(KQueueDomainSocketChannel.class)
                .handler(new ChannelInitializer<KQueueDomainSocketChannel>() {
                    @Override
                    protected void initChannel(KQueueDomainSocketChannel socketChannel) {
                        socketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                        socketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                        socketChannel.pipeline().addLast(new ClientHandler());
                    }
                });

        ChannelFuture future = bootstrap.connect(new DomainSocketAddress("/tmp/test.sock")).sync();

        if (future.isSuccess()) {
            System.out.println("连接服务器成功");
        }

        channel = future.channel();
    }

2.2.2. 分析

Unix Socket协议

Unix Domain Socket(简称UDS)是一个用于实现本地进程间通信的协议。与使用网络套接字(socket)进行通信不同,UDS仅用于同一台机器上的相邻进程之间的通信。

在Unix/Linux系统中,UDS通常被用于代替TCP/IP套接字来提高性能和安全性。不过它们可以通过文件系统路径来建立连接,不能跨机器通信。

Netty中协议切换

通过对比上述代码,可以看出netty中切换协议是比较简单的,换成对应的 Channel 实现类,以及连接方式就可以了。

因为是mac中运行,示例代码中用KQueueDomainSocketChannel替代DomainSocketChannel

2.3. 测试

Controller发消息

@RestController
public class MsgController {
    @Autowired
    private NettyClient nettyClient;

    @PostMapping("/send")
    public ResponseEntity<Void> sendMsg(@RequestBody String msg) {
        System.out.println(msg.getBytes(StandardCharsets.UTF_8).length);
        try {
            for (int i = 0; i < 1000; i++) {
                nettyClient.send(msg);
            }
            return new ResponseEntity<>(HttpStatus.OK);
        } catch (Exception e) {
            return new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR);
        }
    }
}

测试结果

前面已经基于 TCP协议写好了netty的客户端、服务端,
现在写接口,可以通过客户端给服务端发消息,不过单次调用会一次性发1000遍。

接口调用传入:hello

预期结果:

Received message from client: hello
Received message from client: hello
... ... // 同样输出1000遍

实际结果:

Received message from client: hello
Received message from client: hellohello
Received message from client: hellohe
Received message from client: llohellohellohellohello
Received message from client: hellohellohello
... ... // 无规则

出现问题的原因就是下一章要将的粘包、拆包问题。

3. 粘包、拆包问题

3.1. 问题分析

3.1.1. tcp协议出现问题的原因

粘包/拆包问题是由TCP协议本身造成的,和Netty本身无关,任何基于TCP协议实现数据传输的技术都会面临这个问题,原因如下:

解决粘包和拆包问题的基本策略就是在应用层引入数据边界。常见的方法有:固定长度、分隔符、在包头中加入长度字段等。

3.1.2. 其他协议为什么没问题

HTTP 协议

HTTP 协议 基于 TCP 协议构建,而 TCP 是一种面向流的协议,所以理论上可能会有粘包问题。但是在实际应用中,HTTP 协议已经做了明确的分包处理,因此通常不需要开发者去处理粘包问题,HTTP 使用了一些特定的方式来定义数据包的边界:

对于 HTTP/1.0 和 HTTP/1.1,一次完整的 HTTP 交互由一个请求和一个响应组成,它们都是相对独立的。请求和响应都有明确的开始行(请求行或状态行)和结束标志(如 Content-Length 头或 chunked 编码表示的消息体长度)。这样可以很清楚地知道报文的开始和结束,避免了粘包问题。
对于 HTTP/2,它引入了二进制帧的概念。每个帧有明确的长度和类型,这也使得在接收端可以准确地解析出各个帧,避免粘包问题。

UDP 协议

UDP 协议 是一种无连接的、不可靠的协议,它并没有像TCP协议那样提供流量控制和拥塞控制等功能,因此在传输过程中可能会出现丢包或乱序等问题。由于UDP协议采用数据报方式进行传输,每个UDP数据报都有独立的头部标识,因此不会出现粘包问题。

WebSocket 协议

WebSocket 协议 建立连接后,客户端和服务器之间会保持长时间的连接状态,可以随时发送和接收数据。当服务器发送数据时,会将数据封装到一个完整的WebSocket帧中,并通过TCP协议进行传输。而客户端收到数据后,会从WebSocket帧中解析出数据,并进行相应处理。这样就避免了TCP协议中的“粘包”和“拆包”问题。

3.1.3. Unix Socket 为什么也有问题

Unix Socket(也被称为 Unix Domain Socket,UDS)主要支持以下两种类型的通信协议:

Unix Socket 的这两种模式在行为上与 TCP 和 UDP 很相似。因此在基于 SOCK_STREAM 协议使用 Netty 开发服务端和客户端时,可能会出现类似粘包的问题。

前面有现成的基于Unix Stream协议实现的代码,我们同样调用接口试一下,发现 Unix Socket 同样会产生粘包问题

解决思路

结合HTTP、UDP、WebSocket 解决粘包/拆包问题的思路,同样也可以推导解决TCP问题的思路:在发送数据时,应该设计一种协议来确定消息的边界,比如:添加特殊的分隔符,或者在每个消息的头部包含消息的长度等。

基于这个思路,Netty 框架提供了 LineBasedFrameDecoder、DelimiterBasedFrameDecoder和 LengthFieldBasedFrameDecoder等解决方案,下面一一介绍。

3.2. 解决方案

3.2.1. LineBasedFrameDecoder

使用行结束符作为数据包的分隔符。每条消息后面都有一个行结束符(例如 \n 或 \r\n),它会一直读取字节直到遇到这个结束符,然后把之前读取到的字节组装成一条消息。

如果没有找到行结束符,那么就认为当前还没有读取到完整的数据包,需要将已经读取到的字节保存起来,等待下次读取。

代码-客户端修改

发送消息的方法中,每条消息结尾都加上行结束符后缀:

public void send(String msg) {
        if (channel != null) {
            channel.writeAndFlush(msg + "\\n");
        } else {
            System.out.println("message sending failed, connection not established");
        }
    }

 代码-服务端修改

在 ChannelPipeline 中加上下列解码的 ChannelHandler:

ch.pipeline().addLast(new LineBasedFrameDecoder(1024));

 局限性

3.2.2. DelimiterBasedFrameDecoder

解决方式

和LineBasedFrameDecoder类似,当接收到数据时,会检查是否存在分隔符。如果存在,它就认为已经读取到了一个完整的消息,并将这个消息传递给下一个ChannelHandler进行处理。如果不存在,它将继续等待,直到读取到分隔符。

区别在于,前者的分隔符固定,而它的分隔符可以自定义。

代码-客户端修改

发送消息的方法中,每条消息结尾都加上行结束符后缀:

public void send(String msg) {
        if (channel != null) {
            channel.writeAndFlush(msg + "$_");
        } else {
            System.out.println("message sending failed, connection not established");
        }
    }

 代码-服务端修改

在 ChannelPipeline 中加上下列解码的 ChannelHandler:

ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));

局限性

3.2.3. FixedLengthFrameDecoder

解决方式

工作原理主要是每次从 ByteBuf 中读取固定长度的字节,然后构造成一个独立的 frame 对象,传递给下一个 handler 处理。

这样可以确保不会因为 TCP 粘包导致多个消息被当作一个消息处理,也不会因为 TCP 拆包导致一个消息被当作多个消息处理。

代码-服务端修改

在 ChannelPipeline 中加上下列解码的 ChannelHandler:

ch.pipeline().addLast(new FixedLengthFrameDecoder(5));

因为传输的参数“hello”是5个字节,这类就固定为5.

局限性

3.2.4. LengthFieldBasedFrameDecoder

解决方式

在网络通信中,发送和接收数据需要遵循同一种协议。LengthFieldBasedFrameDecoder 是一个基于长度字段的解码器,而 LengthFieldPrepender 则是一个对应的编码器,它会在消息体前面加上一个长度字段。

它们一般会配套使用,这样发送端发送的数据和接收端接收的数据结构就会保持一致,从而能够正确地进行解码。

代码-客户端修改

添加ChannelHandler实现,通过LengthFieldPrepender这个编码器,在发送的消息前添加长度字段(这里的 4 是指长度字段本身占用的字节数量):

socketChannel.pipeline().addLast(new LengthFieldPrepender(4));

代码-服务端修改

在 ChannelPipeline 中加上下列解码的 ChannelHandler:

ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));

4. Netty特性优化

4.1. 内存池 PooledByteBufAllocator

内存池是一种用于管理和复用内存块的技术。可以避免频繁地分配和释放内存,从而减少系统开销和内存碎片问题,提高系统的效率和性能。

PooledByteBufAllocator(分配器 [ˈæləˌkeɪtər]) 是 Netty 提供的一个基于内存池的 ByteBuf 分配器。与直接创建新的 ByteBuf 实例相比,PooledByteBufAllocator 提供了重用内存的能力,这可以显著减少内存分配和垃圾收集的开销,提高性能:

通过这些方式,PooledByteBufAllocator 可以有效地复用内存,提高了内存使用的效率和性能。

PooledByteBufAllocator 创建 ByteBuf 过程

PooledByteBufAllocator allocator = new PooledByteBufAllocator();

// 分别分配堆内存、堆外内存,内存大小也可以指定,如: allocator.heapBuffer(1024);
ByteBuf heapBuffer = allocator.heapBuffer();
ByteBuf directBuffer = allocator.directBuffer(); 

// 正常将写入数据或读取
heapBuffer.writeBytes(data);
byte b = heapBuffer.readByte();

// 记得不用时释放内存,堆外内存不受垃圾回收,不释放会有内存泄露
heapBuffer.release();
directBuffer.release();

不过实际项目中,很少有见过通过创建 PooledByteBufAllocator,再创建 ByteBuf 的。

基本都是由 Unpooled 工具类 创建 ByteBuf。

创建:堆内内存 OR 堆外内存?

(1)堆内内存:

如果你需要处理的数据比较小(比如几 KB 或几百 KB),而且需要进行频繁的读写操作,那么建议使用堆内内存。

(2)堆外内存:

如果你需要处理的数据比较大(比如几 MB 或几十 MB),而且需要进行频繁的 IO 操作,那么建议使用堆外内存。堆外内存是由操作系统管理的,数据存储在操作系统的内存中,可以直接进行 IO 操作。此外,在使用堆外内存时,可以避免 Java 堆和操作系统之间的数据拷贝,减少了系统的开销和延迟。

需要注意的是,堆外内存的申请和释放需要调用 JNI 接口,因此申请和释放堆外内存的开销会比较高。因此一般来说:

对于小规模的数据处理应用,建议使用堆内内存;对于大规模的数据处理应用,建议使用堆外内存

4.2. 内存池 Unpooled

Unpooled 是 Netty 中一个工具类,用于创建不同类型的 ByteBuf 对象,而且同样是使用PooledByteBufAllocator 类来分配和管理内存。

只不过它提供了一些静态方法,可以很方便地创建 HeapBuf、DirectBuf、CompositeBuf 等类型的 ByteBuf 对象。常见方法:

不过同样要记得在使用完毕后,应该及时调用 release() 方法来释放 ByteBuf 对象的资源哦。

回顾一下:考虑到Netty中 ByteBuf 等常用类,为避免频繁地分配和释放内存,通过内存池实现内存复用。但 ByteBuf 也是类,频繁地创建、销毁对象同样有大量的性能开销,怎么优化?

那么接下来我们看一下 对象池。

4.3. 对象池 Recycler

Recycler (回收器,[ˌriːˈsaɪkl] )是 Netty是一个对象池,主要用于重用对象,避免频繁创建和销毁带来的性能开销。被广泛地应用于各种场景中,例如 ByteBuf 对象池、EventExecutor 对象池、ChannelHandlerContext 对象池等等。我们还是来看看 ByteBuf。

ByteBuf 中包含一个 Recycler.Handle 对象,用于管理 ByteBuf 对象池的创建和销毁。当需要创建一个新的 ByteBuf 对象时,无论通过前面介绍的PooledByteBufAllocator、Unpooled,都是通过 ByteBufAllocator 接口提供的 directBuffer() 或 heapBuffer() 等方法来创建。

这些方法就是基于Recycler,会自动从线程本地的对象池中获取一个 ByteBuf 对象,如果对象池为空,则会创建一个新对象,并将其加入对象池中。当不再需要这个对象时,可以通过调用 release() 方法将其回收到对象池中,等待下次使用。

ChannelHandlerContext 对象池也类似,在 Netty 中,可以通过 ChannelHandlerContext 的 newContext() 方法来获取一个新的 ChannelHandlerContext 对象,这个方法会从 Recycler 对象池中获取一个 ChannelHandlerContext 对象并进行初始化,如果没有可用的对象,则会创建一个新对象。在使用完后,通过调用 ChannelHandlerContext 的 recycle() 方法将其回收到对象池中,等待下次使用。

当然 Recycler 是 Netty 中实现对象池的机制,并不局限于只有 Netty 的这些组件类可以用,任何我们自定义的类都可以。下面看一个例子。

示例(任何对象)

public class UserCache {
    private static final Recycler<User> userRecycler = new Recycler<User>() {
        @Override
        protected User newObject(Handle<User> handle) {
            return new User(handle);
        }
    };
    static final class User {
        private String name;
        private Recycler.Handle<User> handle;
        public void setName(String name) {
            this.name = name;
        }
        public String getName() {
            return name;
        }
        public User(Recycler.Handle<User> handle) {
            this.handle = handle;
        }
        public void recycle() {
            handle.recycle(this);
        }
    }
    public static void main(String[] args) {
        User user1 = userRecycler.get();
        user1.setName("hello");
        user1.recycle();
        User user2 = userRecycler.get();
        System.out.println(user1 == user2);
    }
}

左边的例子中,我们定义了一个User类,main方法中,user1.recycle(),user1回收了之后,然后 user2 再获取。

线程安全

另外,Recycler 使用线程本地变量(FastThreadLocal)来存储对象,每个线程都有一个独立的对象池。这个机制可以保证对象的安全性和线程互相独立,避免了线程安全问题和竞争条件的出现。

那么这个 FastThreadLocal 是啥?和常见的 ThreadLocal 有啥关系呢?

4.4. 本地线程优化 FastThreadLocal

FastThreadLocal(更快的ThreadLocal) 是 Netty 自己研发的一个工具类,用于替换 Java 原生的 ThreadLocal。主要有以下几个原因:

 代码示例

public class FastThreadLocalDemo {
    private static final FastThreadLocal<Integer> THREAD_LOCAL = new FastThreadLocal<Integer>() {
        @Override
        protected Integer initialValue() throws Exception {
            return 1;
        }
    };
    public static void main(String[] args) {
        new FastThreadLocalThread(() -> {
            for (int i = 0; i < 10; i++) {
                System.out.println(Thread.currentThread().getName() + " --> " + THREAD_LOCAL.get());
                THREAD_LOCAL.set(THREAD_LOCAL.get() + 1);
            }
        }, "FastThreadLocalThread-1").start();
    }
}

注意事项

FastThreadLocal 的使用方式和 ThreadLocal差别不大,但是有几点需要注意:

虽说在使用了 FastThreadLocalThread 实例的情况下,在线程结束时,FastThreadLocal 会自动清理所有线程局部变量。但显式地调用 remove() 方法仍然是一个好的实践。特别是在长生命周期的线程或者使用了线程池的情况下,显式地清理线程局部变量可以帮助避免潜在的内存泄漏问题。

以上就是Netty开发及粘包实战解决分析的详细内容,更多关于Netty开发粘包解决的资料请关注脚本之家其它相关文章!

以上就是Netty开发及粘包实战解决分析的详细内容,更多关于Netty开发粘包解决的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文