java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringBoot使用 Netty

SpringBoot中使用Netty的实现示例

作者:cg5017

本文主要介绍了SpringBoot中使用Netty的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

一、什么是Netty

Netty 是 Java 中一个非常高性能的网络通信框架,用来开发服务器和客户端程序,主要用于处理 TCP/UDP 的网络连接,比如:

你可以把 Netty 理解为一种比 Java 原生 Socket 更方便、性能更强的“网络搭建工具”。再详细了解Netty的工作原理之前,我们先来看一下Java中最简单的客户端和服务器之间的连接。

二、最简单的 Java 网络通信

2.1什么是“客户端”和“服务端”?

我们先理解一个现实生活的比喻奶茶店点单系统

还可以更加省略一点来说就是 💬 一个人发送消息(客户端) ➜ 另一个人接收并回复(服务端)

2.2服务端

import java.io.*;
import java.net.*;

public class Server {
    public static void main(String[] args) throws Exception {
        ServerSocket serverSocket = new ServerSocket(8080); // 在8080端口等别人来找
        System.out.println("服务端启动,等待客户端连接...");

        Socket socket = serverSocket.accept(); // 有人来连接,就接收它
        System.out.println("客户端连接进来了");

        // 输入输出流:用来读写数据
        BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 读
        PrintWriter out = new PrintWriter(socket.getOutputStream(), true); // 写

        String line;
        while ((line = in.readLine()) != null) {
            System.out.println("收到客户端消息:" + line);
            out.println("我收到了:" + line); // 回给客户端
        }

        socket.close(); // 关闭连接
        serverSocket.close();
    }
}

2.3客户端

import java.io.*;
import java.net.*;

public class Client {
    public static void main(String[] args) throws Exception {
        Socket socket = new Socket("127.0.0.1", 8080); // 连接本机服务端
        System.out.println("连接服务端成功!");

        // 输入输出
        BufferedReader userInput = new BufferedReader(new InputStreamReader(System.in)); // 你键盘输入
        PrintWriter out = new PrintWriter(socket.getOutputStream(), true); // 发消息
        BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 收消息

        String msg;
        while ((msg = userInput.readLine()) != null) {
            out.println(msg); // 发给服务端
            String reply = in.readLine(); // 读取服务端返回
            System.out.println("服务端说:" + reply);
        }

        socket.close();
    }
}

2.4 服务端和客户端之间的通信

首先是服务端先启动,会有如下显示,同时告诉顾客我家的店的端口号是8080。

服务端启动,等待客户端连接...

然后有顾客想买东西,通过 new Socket("127.0.0.1", 8080); // 连接本机服务端,即走进服务器店的大门8080。而在服务器这端,通过serverSocket.accept(); 看见有人来连接,就接收它,服务它。这时候客户端会输出如下

连接服务端成功!

服务端会输出如下:

客户端连接进来了

在客户端通过控制台输入:hello后,即通过如下代码接收到了你的输入,并存放在userInput变量中。

BufferedReader userInput = new BufferedReader(new InputStreamReader(System.in));

客户端通过out对象发消息 

 PrintWriter out = new PrintWriter(socket.getOutputStream(), true); // 发消息

 客户端通过in对象接受消息 

BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 收消息

当 msg = userInput.readLine()) != null ,即当检测到客户端要发送消息就执行如下代码:

out.println(msg); // 发给服务端
String reply = in.readLine(); // 读取服务端返回
System.out.println("服务端说:" + reply);

out.println(msg)后,就将信息发送到了服务器端,服务器端就会输出如下

收到客户端消息:hello

同时在服务器端通过 out.println("我收到了:" + line); 回给客户端,客户端通过reply接收到消息,客户端就会输出

服务端说:我收到了:hello

2.5 客户端和服务器端的关系如下: 

角色作用
Server永远在等别人来(监听端口)
Client主动发起连接
Input/Output收发消息用的“通道”

 二、为什么需要线程模型?(Thread Model)

理解了基础的服务端和客户端通信,我们可以继续深入,了解一些稍微复杂一点的概念,即线程

在前面那个简单的服务端/客户端例子中,服务端是“串行”的,意思是:

但是如果你有很多客户端同时发消息,服务端就会变得很慢,因为它只能一个一个地处理请求。

所以,我们需要更高效的处理方式:并发编程并发编程意味着能够同时处理多个任务,不等一个任务完成再开始下一个。而且每个任务都不会相互阻塞。这就是 线程池 和 事件循环模型 的价值所在。在 Netty 中:

三、什么是“阻塞”和“非阻塞”?

❌ 阻塞:你去餐厅吃饭,服务员给你一个菜单,但你必须等着他们准备好菜才能吃,期间你不能干别的事。

✅ 非阻塞:你点菜后,服务员会告诉你“稍等一会儿”,然后你可以做其他事。只要菜做好了,服务员会告诉你,打断你做其他事,给你菜。

TCP 通信中的阻塞和非阻塞:

Netty 默认就是 非阻塞 的,这样它能同时处理很多连接,不会被一个请求堵住。

四、Netty 是如何处理高并发的?

Netty通过使用一个线程模型 EventLoop(事件循环)来处理高并发。EventLoopGroup:管理多个线程(可以理解为多个服务员),负责处理网络事件。EventLoop:每个线程负责自己的一部分任务,比如处理某一个客户端的请求。

举例来看就是:

4.1 EventLoop 和 NIO 的关系

Netty 使用了 NIO(非阻塞 IO) 模型。NIO 让一个线程能处理多个连接。具体来说:

这个模型让 Netty 在面对数千个并发连接时,也能保持高效。

总结来看,Netty的EventLoopGroup管理多个线程,每个线程只干特定的事情,假设某个线程只干连接客户端这个事情,又由于Netty引入了NIO模型,所以又让这个负责处理连接的线程具备了同时处理多个连接请求的能力

五、实际的 Netty 服务端示例

public class EchoServer {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);  // 负责接收连接
        EventLoopGroup workerGroup = new NioEventLoopGroup(); // 负责处理请求

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new EchoServerHandler());

            ChannelFuture f = b.bind(8080).sync();  // 绑定端口,开始监听
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    // 处理客户端发来的消息
    public static class EchoServerHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 接收到数据后直接写回给客户端
            System.out.println("收到消息:" + msg);
            ctx.writeAndFlush(msg);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close(); // 出现异常关闭连接
        }
    }
}

六、实际使用的Netty

6.1 NettyServer类

ServerBootstrap:Netty服务器启动的核心类。

ServerBootstrap serverBootstrap = new ServerBootstrap()
    .group(bossGroup, workGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ServerChannelInitializer(delimiter, maxFrameLength))
    .localAddress(socketAddress)
    .option(ChannelOption.SO_BACKLOG, 1024)
    .childOption(ChannelOption.SO_KEEPALIVE, true);

6.1.1启动并绑定端口

ChannelFuture channelFuture = serverBootstrap.bind(socketAddress).sync();

6.2 SeverChannelInitializer类

在NettyServer类中,我们是调用了SeverChannelInitializer类的,我们使用SeverChannelInitializer类来配置如何处理数据的编解码、业务逻辑等。当每个客户端连接进来时,配置它的 Channel 的“流水线”——也就是这个连接收到/发送数据时,按什么顺序怎么处理。可以把它理解为工厂生产线的“组装说明书”。

package com....nettyService;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LoggingHandler;

public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {

    private String DELIMITER;
    private int MAXFRAMELENGTH;

    public ServerChannelInitializer(String delimiter, int maxFrameLength) {
        DELIMITER = delimiter;
        MAXFRAMELENGTH = maxFrameLength;
    }

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast("logging", new LoggingHandler("DEBUG"));
        socketChannel.pipeline().addLast("decoder", new HL7Decoder());
        socketChannel.pipeline().addLast("encoder", new HL7Encoder());
        socketChannel.pipeline().addLast(new NettyServerHandler());
    }

}

SeverChannelInitializer首先继承了ChannelInitializer<SocketChannel>,这样没有一个新的连接的时候Netty 就会调用 initChannel() 方法,给这个连接安装一套“处理器组合”(pipeline)。

而这一套“处理器组合”当接收到客户端发送的消息执行顺序如下:

【客户端】==> socketChannel
         ↓
[LoggingHandler](打印日志)
         ↓
[HL7Decoder](解码消息)
         ↓
[NettyServerHandler](业务处理)

当服务端要回复消息,其执行顺序如下:

   NettyServerHandler.write()
         ↓
       [HL7Encoder](编码为字节)
         ↓
         [LoggingHandler](打印)
         ↓
          【客户端】

6.3 NettySeverHandler类

在SeverChannelInitializer类中,其写好了业务处理顺序,在处理业务时,其处理业务的核心是NettySeverHandler类来实现的

package com.....nettyService;

import com...component.commons.utils.BeanUtils;
import com...emergency.service.impl.BS2800MPacketParse;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;


@Component
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {

    private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
    private BS2800MPacketParse bs2800MPacketParse = BeanUtils.getBean(BS2800MPacketParse.class);
    /**
     * 装载所有客户端channel的组
     */
    private static final Map<String, Channel> ipChannelMap = new HashMap<>();

    /**
     * 客户端连接过来会触发
     */
    @Override
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        Channel channel = channelHandlerContext.channel();
        ipChannelMap.put(channel.remoteAddress().toString(), channel);

        logger.info("客户端连接:" + channelHandlerContext);
    }

    /**
     * 客户端发消息过来会触发
     */
    @Override
    public void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        Channel channel = channelHandlerContext.channel();
        logger.info("服务端接收到客户端消息");
//        logger.info("发送消息的客户端地址:" + channel.remoteAddress());
        logger.info("发送消息的客户端所发消息:" + msg);

        String result = msg;
        String msa = handleParams(channelHandlerContext, result);
        if (ObjectUtils.isNotEmpty(msa)) {
            channel.writeAndFlush(msa);
        }

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelReadComplete(channelHandlerContext);
    }

    @Override
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        Channel channel = channelHandlerContext.channel();
        // 当通道变为非活动状态(断开连接)时,将其从 ChannelGroup 中移除
        String ip = channel.remoteAddress().toString();
        if (ipChannelMap.containsKey(ip)) {
            ipChannelMap.remove(ip);
            if (!channel.isActive() || channel == null) {
                channelHandlerContext.close();
            }
        }
        logger.info("客户端地址为:" + ip + "的连接已断开");
    }

    /**
     * 发生异常触发
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable cause) throws Exception {
        logger.warn(cause.toString());
    }

    /**
     * 处理接收报文消息
     */
    public String handleParams(ChannelHandlerContext channelHandlerContext, String msg) {
        String msa = null;
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
        Channel channel = channelHandlerContext.channel();
        if (channel.remoteAddress().toString().contains("10.10.51.213")) {
            if (ObjectUtils.isNotEmpty(msg)) {
                String result[] = msg.split("\r");
                if (ObjectUtils.isNotEmpty(result) && result.length > 0) {
                    String msh = null;
                    for (String string : result) {
                        if (string.contains("MSH")) {
                            msh = string;
                        }
                    }
                    if (msh.contains("ORU^R01")) {
                        Date date = new Date();
                        String temp[] = msh.split("\\|", -1);
                        if (ObjectUtils.isNotEmpty(temp) && temp.length > 9) {
                            msa = "MSH|^~\\&|||||" + dateFormat.format(date) + "||ACK^R01|" + temp[9] + "|P|2.3.1||||0||ASCII|||";
                            String str = "MSA|AA|" + temp[9] + "|Message accepted|||0|";
                            msa = msa + "\r" + str;
                            Map<String, String> paramMap = new HashMap<>();
                            paramMap.put(temp[9], msg);
                            bs2800MPacketParse.parse(msg);
                            return msa;
                        }
                    }
                }
            }
        }
        return msa;
    }

}

6.3.1继承

NettyServerHandler继承SimpleChannelInboundHandler<String> 每次接收到客户端消息(已经是 String 类型,说明解码器已完成解码),就会触发 channelRead0() 方法。我们可以在这里处理逻辑、保存数据、做回复等

6.3.2channelActive

有客户端连接进来时,Netty 会自动调用这个方法。将客户端的 Channel 保存到 ipChannelMap 中,方便后面用 IP 找到连接。同时打印客户端连接信息。

6.3.3channelRead0

每当客户端发一条消息过来,就会自动执行这里!先获取当前的 Channel(对应客户端)

Channel channel = channelHandlerContext.channel();

打印日志,方便调试看到收到的数据

到此这篇关于SpringBoot中使用Netty的实现示例的文章就介绍到这了,更多相关SpringBoot使用 Netty内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文