java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Netty解决半包和粘包

Netty解决半包和粘包问题的方案

作者:猿java

Netty 是一个高性能、异步事件驱动的网络应用框架,广泛应用于各种网络通信场景,这篇文章,我们将详细分析 Netty 是如何解决半包和粘包问题,文中通过代码示介绍的非常详细,需要的朋友可以参考下

什么是半包和粘包?

半包问题

半包问题是指一个完整的应用层消息被分成多个 TCP 数据包发送,接收端在一次读取操作中只接收到消息的一部分。

例如,发送端发送了一条 100 字节的消息,但由于网络原因,这条消息被拆分成了两个 TCP 数据包,一个 60 字节,另一个 40 字节。接收端可能在第一次读取时只接收到前 60 字节的数据,剩下的 40 字节需要在后续的读取操作中才能接收到。

粘包问题

粘包问题是指多个应用层消息在传输过程中被粘在一起,接收端在一次读取操作中接收到大于 1个消息的情况。

例如,发送端发送了两条消息,每条 50 字节,但接收端在一次读取操作中收到了 80 字节的数据,超过了 1条消息的内容。

产生原因

产生半包和粘包问题主要是以下 3个原因:

示例

假设发送端发送了两条消息:

在半包情况下,接收端可能会这样接收:

在粘包情况下,接收端可能会这样接收:

解决方案

基于固定长度的解码器

基于固定长度的解码器是指发消息时,每条消息的长度固定,读消息时也通过固定长度来读取消息,从而解决半包和粘包问题。

实现方式

Netty 提供了 FixedLengthFrameDecoder 类来实现这一功能,核心源码如下:

public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
private final int frameLength;

    public FixedLengthFrameDecoder(int frameLength) {
        this.frameLength = frameLength;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        while (in.readableBytes() >= frameLength) {
            ByteBuf buf = in.readBytes(frameLength);
            out.add(buf);
        }
    }
}

注意点

使用定长帧需要注意以下几点:

优点

缺点

示例

下面我们通过一个示例来展示使用定长帧是如何解决半包粘包问题的。

发送端,确保每个消息的长度固定。如果实际消息长度不足,可以使用填充字符(如空格)来补齐。

public class FixedLengthFrameSender {

    private static final int FRAME_LENGTH = 10; // 固定消息长度

    public static void send(Channel channel, String message) {
        // 确保消息长度不超过固定长度
        if (message.length() > FRAME_LENGTH) {
            throw new IllegalArgumentException("Message too long");
        }
        // 使用空格填充消息到固定长度
        String paddedMessage = String.format("%-" + FRAME_LENGTH + "s", message);
        
        // 将消息转换为字节数组并发送
        ByteBuf buffer = Unpooled.copiedBuffer(paddedMessage.getBytes());
        channel.writeAndFlush(buffer);
    }
}

接收端,使用 Netty 提供的 FixedLengthFrameDecoder 解码器来处理固定长度的消息。

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class FixedLengthFrameReceiver {
    private static final int FRAME_LENGTH = 10; // 固定消息长度

    public static void main(String[] args) throws Exception {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     // 添加定长帧解码器
                     p.addLast(new FixedLengthFrameDecoder(FRAME_LENGTH));
                     // 添加自定义处理器
                     p.addLast(new FixedLengthFrameHandler());
                 }
             });
            // 启动服务器
            b.bind(8888).sync().channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static class FixedLengthFrameHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ByteBuf in = (ByteBuf) msg;
            byte[] receivedBytes = new byte[in.readableBytes()];
            in.readBytes(receivedBytes);
            String receivedMsg = new String(receivedBytes).trim(); // 去除填充字符
            System.out.println("Received: " + receivedMsg);
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

基于换行符解码器

自定义分隔符解码器

基于换行符解码器和自定义分隔符解码器(比如 特殊字符)来划分消息边界,从而解决半包和粘包问题,使用者可以根据自己的需求灵活确定分隔符。

实现方式

Netty 提供了 DelimiterBasedFrameDecoder 类来实现这一功能,核心源码如下:

public DelimiterBasedFrameDecoder(
        int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {
   validateMaxFrameLength(maxFrameLength);
   ObjectUtil.checkNonEmpty(delimiters, "delimiters");

   if (isLineBased(delimiters) && !isSubclass()) {
      lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
      this.delimiters = null;
   } else {
      this.delimiters = new ByteBuf[delimiters.length];
      for (int i = 0; i < delimiters.length; i ++) {
         ByteBuf d = delimiters[i];
         validateDelimiter(d);
         this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
      }
      lineBasedDecoder = null;
   }
   this.maxFrameLength = maxFrameLength;
   this.stripDelimiter = stripDelimiter;
   this.failFast = failFast;
}

注意点

优点

缺点

示例

下面我们通过一个示例来展示使用分隔符是如何解决半包粘包问题的。

发送端,确保每个消息以特定的分隔符结尾。常用的分隔符包括换行符(\n)、特定字符(如 |)等。

public class DelimiterBasedFrameSender {

    private static final String DELIMITER = "\n"; // 分隔符

    public static void send(Channel channel, String message) {
        // 在消息末尾添加分隔符
        String delimitedMessage = message + DELIMITER;
        
        // 将消息转换为字节数组并发送
        ByteBuf buffer = Unpooled.copiedBuffer(delimitedMessage.getBytes());
        channel.writeAndFlush(buffer);
    }
}

接收端,使用 Netty 提供的 DelimiterBasedFrameDecoder 解码器来处理以分隔符结尾的消息。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class DelimiterBasedFrameReceiver {

    private static final String DELIMITER = "\n"; // 分隔符
    private static final int MAX_FRAME_LENGTH = 1024; // 最大帧长度

    public static void main(String[] args) throws Exception {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     // 添加分隔符解码器
                     ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER.getBytes());
                     p.addLast(new DelimiterBasedFrameDecoder(MAX_FRAME_LENGTH, delimiter));
                     // 添加字符串解码器
                     p.addLast(new StringDecoder());
                     // 添加自定义处理器
                     p.addLast(new DelimiterBasedFrameHandler());
                 }
             });

            // 启动服务器
            b.bind(8888).sync().channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static class DelimiterBasedFrameHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            String receivedMsg = (String) msg;
            System.out.println("Received: " + receivedMsg);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

基于长度字段的解码器

基于长度字段的解码器是指在消息头部添加长度字段,指示消息的总长度。

实现方式

Netty 提供了 LengthFieldBasedFrameDecoder 类来实现这一功能,核心源码如下:

public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
private final int maxFrameLength;
private final int lengthFieldOffset;
private final int lengthFieldLength;

    public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
        this.maxFrameLength = maxFrameLength;
        this.lengthFieldOffset = lengthFieldOffset;
        this.lengthFieldLength = lengthFieldLength;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < lengthFieldOffset + lengthFieldLength) {
            return;
        }

        in.markReaderIndex();
        int length = in.getInt(in.readerIndex() + lengthFieldOffset);
        if (in.readableBytes() < lengthFieldOffset + lengthFieldLength + length) {
            in.resetReaderIndex();
            return;
        }

        in.skipBytes(lengthFieldOffset + lengthFieldLength);
        ByteBuf frame = in.readBytes(length);
        out.add(frame);
    }
}

关键点

优点

缺点

示例

下面我们通过一个示例来展示使用长度字段是如何解决半包粘包问题的。

发送端,确保每个消息在发送前都包含长度字段。长度字段通常放在消息的头部,用于指示消息的总长度。

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;

public class LengthFieldBasedFrameSender {

    public static void send(Channel channel, String message) {
        // 将消息转换为字节数组
        byte[] messageBytes = message.getBytes();
        int messageLength = messageBytes.length;

        // 创建一个 ByteBuf 来存储长度字段和消息内容
        ByteBuf buffer = Unpooled.buffer(4 + messageLength);

        // 写入长度字段(4 字节,表示消息长度)
        buffer.writeInt(messageLength);

        // 写入消息内容
        buffer.writeBytes(messageBytes);

        // 发送消息
        channel.writeAndFlush(buffer);
    }
}

接收端,使用 Netty 提供的 LengthFieldBasedFrameDecoder 解码器来处理包含长度字段的消息。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class LengthFieldBasedFrameReceiver {

    private static final int MAX_FRAME_LENGTH = 1024; // 最大帧长度

    public static void main(String[] args) throws Exception {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     // 添加长度字段解码器
                     p.addLast(new LengthFieldBasedFrameDecoder(
                         MAX_FRAME_LENGTH, 0, 4, 0, 4));
                     // 添加字符串解码器
                     p.addLast(new StringDecoder());
                     // 添加自定义处理器
                     p.addLast(new LengthFieldBasedFrameHandler());
                 }
             });

            // 启动服务器
            b.bind(8888).sync().channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static class LengthFieldBasedFrameHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            String receivedMsg = (String) msg;
            System.out.println("Received: " + receivedMsg);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

自定义解码器

如果上述 Netty提供的方案无法满足业务需求的话,Netty还提供了一个扩展点,使用者可以通过自定义解码器来处理消息,

实现方式

例如,自定义头部信息来表示消息长度或结束标志,示例代码如下:

public class CustomProtocolDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 根据自定义协议解析消息
        if (in.readableBytes() < 4) {
            return;
        }

        in.markReaderIndex();
        int length = in.readInt();
        if (in.readableBytes() < length) {
            in.resetReaderIndex();
            return;
        }

        ByteBuf frame = in.readBytes(length);
        out.add(frame);
    }
}

优点

缺点

总结

本文我们分析了产生半包和粘包的原因以及在Netty中的 5种解决方案:

通过学习这些内容,我们不仅掌握了半包和粘包问题的理论知识,同时学会了多种解决方法的具体实现。

以上就是Netty解决半包和粘包问题的方案的详细内容,更多关于Netty解决半包和粘包的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文