使用java项目搭建一个netty服务
作者:傀儡师
这篇文章主要为大家详细介绍了如何使用java项目搭建一个netty服务,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
映入依赖,只要保证有这个依赖,就不需要单独引入依赖,支持多个端口直连,支持多个实现层解析数据,
<groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>3.3.4</version>
yml配置
# TCP设备对接
iot:
device:
port1: 1883
port2: 1885
package com.cqcloud.platform.handler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.cqcloud.platform.service.IotNbIotMqttService;
import com.cqcloud.platform.service.IotPushService;
import com.cqcloud.platform.service.impl.IotNbIotServiceImpl;
import com.cqcloud.platform.service.impl.IotPushServiceImpl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import jakarta.annotation.PostConstruct;
/**
* @author weimeilayer@gmail.com ✨
* @date 💓💕 2022年3月8日🐬🐇 💓💕
*/
@Component
public class NettyTcpServer {
/**
* 用于自设备1协议端口
*/
private static int PORT1;
/**
* 来自设备2协议端口
*/
private static int PORT2;
@Value("${iot.device.port1}")
public int port1Value;
@Value("${iot.device.port2}")
public int port2Value;
@PostConstruct
public void init() {
PORT1 = port1Value;
PORT2 = port2Value;
}
public void start() throws Exception {
final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
// 创建 MqttService 和 MqttPushService 实例
IotNbIotMqttService iotNbIotMqttService = new IotNbIotServiceImpl();
IotPushService iotPushService = new IotPushServiceImpl();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 直接使用 ByteBuf,无需编码器和解码器
// 根据端口注入不同的服务
if (ch.localAddress().getPort() == PORT1) {
pipeline.addLast(new TcpIotNbServerHandler(iotNbIotMqttService)); // 业务逻辑处理器
} else if (ch.localAddress().getPort() == PORT2) {
pipeline.addLast(new TcpIotServerHandler(iotPushService)); // 新处理器
}
}
});
// 绑定第一个端口并启动
ChannelFuture future1 = bootstrap.bind(PORT1).sync();
// 绑定第二个端口并启动
ChannelFuture future2 = bootstrap.bind(PORT2).sync();
// 等待服务器关闭
future1.channel().closeFuture().sync();
future2.channel().closeFuture().sync();
} finally {
// 优雅地关闭线程池
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}启动类需要
public static void main(String[] args) throws IOException {
ConfigurableEnvironment env = new SpringApplication(DynamicYearningApplication.class).run(args).getEnvironment();
String envPort = env.getProperty("server.port");
String port = Objects.isNull(envPort) ? "8000" : envPort;
String envContext = env.getProperty("server.servlet.context-path");
String contextPath = Objects.isNull(envContext) ? "" : envContext;
String path = port + contextPath + "/doc.html";
String externalAPI = InetAddress.getLocalHost().getHostAddress();
Console.log("Access URLs:\n\t-------------------------------------------------------------------------\n\tLocal-swagger: \t\thttp://127.0.0.1:{}\n\tExternal-swagger: \thttp://{}:{}\n\t-------------------------------------------------------------------------",path, externalAPI, path);
// 加上以下代码
NettyTcpServer server = new NettyTcpServer();
try {
server.start();
} catch (Exception e) {
e.printStackTrace();
}
}
创建TcpIotServerHandler
package com.cqcloud.platform.handler;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.StringPool;
import com.cqcloud.platform.entity.IotCommandRecords;
import com.cqcloud.platform.service.IotPushService;
import com.cqcloud.platform.utils.DeviceActionParser;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* 设备协议
* @author weimeilayer@gmail.com ✨
* @date 💓💕 2022年3月8日 🐬🐇 💓💕
*/
@Slf4j
public class TcpIotServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
// 接口注入
private final IotPushService iotPushService;
public TcpIotServerHandler(IotPushService iotPushService) {
this.iotPushService = iotPushService;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
byte[] byteArray;
if (in.readableBytes() <= 0) {
in.release();
return;
}
byteArray = new byte[in.readableBytes()];
in.readBytes(byteArray);
if (byteArray.length <= 0) {
in.release();
return;
}
// 将消息传递给 iotPushService
iotPushService.pushMessageArrived(byteArray);
}
// 发送响应的统一辅助方法
private void sendResponse(ChannelHandlerContext ctx, String hexResponse) {
byte[] responseBytes = hexStringToByteArray(hexResponse);
ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes);
ctx.writeAndFlush(responseBuffer);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 打印异常堆栈跟踪,便于调试和错误排查
cause.printStackTrace();
// 关闭当前的通道,释放相关资源
ctx.close();
}
}创建 TcpIotNbServerHandler
package com.cqcloud.platform.handler;
import com.cqcloud.platform.service.IotNbIotMqttService;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* NB-IOT CAT1数据格协议
*
* @author weimeilayer@gmail.com
* @date 💓💕2022年3月8日🐬🐇💓💕
*/
public class TcpIotNbServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final IotNbIotMqttService iotNbIotMqttService;
// 构造函数注入 MqttService
public TcpIotNbServerHandler(IotNbIotMqttService iotNbIotMqttService) {
this.iotNbIotMqttService = iotNbIotMqttService;
}
@Override
public void channelRead0(ChannelHandlerContext ctx,ByteBuf in) {
byte[] byteArray;
if (in.readableBytes() <= 0) {
in.release();
return;
}
byteArray = new byte[in.readableBytes()];
in.readBytes(byteArray);
if (byteArray.length <= 0) {
in.release();
return;
}
// 将 byte[] 数据传递给 iotNbIotMqttService
iotNbIotMqttService.messageArrived(byteArray);
//发送固定事件默认回复
sendResponse(ctx);
}
// 发送响应的统一辅助方法
private void sendResponse(ChannelHandlerContext ctx) {
// 回复客户端--向设备回复AAAA8001(设备将保持20秒不休眠),平台尽量在10秒
byte[] responseBytes = new byte[] { (byte) 0xAA, (byte) 0xAA, (byte) 0x80, (byte) 0x01 };
ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes);
ctx.writeAndFlush(responseBuffer);
}
//将响应消息转换为字节数组
public static byte[] hexStringToByteArray(String s) {
int len = s.length();
byte[] data = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
+ Character.digit(s.charAt(i + 1), 16));
}
return data;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
创建接口类IotPushService
package com.cqcloud.platform.service;
/**
* @author weimeilayer@gmail.com
* @date 💓💕2022年3月8日🐬🐇💓💕
*/
public interface IotPushService {
public void pushMessageArrived(byte[] message);
}
创建IotNbIotMqttService 类
package com.cqcloud.platform.service;
/**
* @author weimeilayer@gmail.com
* @date 💓💕2022年3月8日🐬🐇💓💕
*/
public interface IotNbIotMqttService {
public void messageArrived(byte[] message);
}
创建实现类IotNbIotServiceImpl
package com.cqcloud.platform.service.impl;
import org.springframework.stereotype.Service;
import com.cqcloud.platform.service.IotNbIotMqttService;
import com.cqcloud.platform.utils.DataParser;
import lombok.AllArgsConstructor;
/**
* @author weimeilayer@gmail.com
* @date 💓💕2022年3月8日🐬🐇💓💕
*/
@Service
@AllArgsConstructor
public class IotNbIotServiceImpl implements IotNbIotMqttService {
@Override
public void messageArrived(byte[] message) {
// 将 byte 数组转换为十六进制字符串
String convertData = printByteArray(message);
// 打印字节数组内容
System.out.println("来自于xxx数据格式协议的1883端口的数据字节数组内容:"+ convertData);
//调用解析方法
dispatchMessage(convertData);
}
// 将 byte[] 转换为十六进制字符串的辅助方法
public static String bytesToHex(byte[] bytes) {
StringBuilder hex = new StringBuilder();
for (byte b : bytes) {
// 将每个字节转换为两位的十六进制表示
hex.append(String.format("%02X", b));
}
return hex.toString();
}
public static String printByteArray(byte[] byteArray) {
StringBuilder hexString = new StringBuilder();
for (byte b : byteArray) {
// 将字节转换为无符号的十六进制字符串,去掉空格
hexString.append(String.format("%02X", b & 0xFF));
}
System.out.println("Byte Array (Hex): " + hexString.toString());
return hexString.toString();
}
public void dispatchMessage(String byteArray) {
String prefix = byteArray.substring(0, 2);
// 根据 messageID 进行判断
System.out.println("来自于数据格式协议来自于1883端口的数据处理消息:" +byteArray);
}
}
创建 IotPushServiceImpl
package com.cqcloud.platform.service.impl;
import org.springframework.stereotype.Service;
import com.cqcloud.platform.service.IotPushService;
import com.cqcloud.platform.utils.DeviceActionParser;
import lombok.AllArgsConstructor;
/**
* 发送指令实现类
* @author weimeilayer@gmail.com
* @date 💓💕2022年3月8日🐬🐇💓💕
*/
@Service
@AllArgsConstructor
public class IotPushServiceImpl implements IotPushService {
@Override
public void pushMessageArrived(byte[] message) {
// 解析字节数组
System.out.println("来自物联网平台的设备协议于1885端口的数据设备返回的的内容处理");
//打印数据
printByteArray(message);
//调用解析方法
dispatchMessage(message);
}
//设备回复的接受内容
public static void dispatchMessage(byte[] byteArray) {
}
public static void printByteArray(byte[] byteArray) {
StringBuilder hexString = new StringBuilder();
for (byte b : byteArray) {
// 将字节转换为无符号的十六进制字符串,去掉空格
hexString.append(String.format("%02X", b & 0xFF));
}
System.out.println("Byte Array (Hex): " + hexString.toString());
}
// 将十六进制字符串转换为字节数组的实用方法
public static byte[] stringToBytes(String s) {
int len = s.length();
byte[] data = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
+ Character.digit(s.charAt(i+1), 16));
}
return data;
}
// 提取设备类型的十六进制字符串
private static String extractDeviceTypeHex(byte[] byteArray) {
// 转换为十六进制字符串
String hexString = bytesToHex(byteArray);
// 提取设备类型
return hexString.substring(10, 12); // 设备类型的位数
}
// 辅助方法:将字节数组转换为十六进制字符串
private static String bytesToHex(byte[] bytes) {
StringBuilder hexString = new StringBuilder();
for (byte b : bytes) {
String hex = Integer.toHexString(0xFF & b);
if (hex.length() == 1) {
hexString.append('0'); // 确保每个字节都为两位
}
hexString.append(hex);
}
return hexString.toString().toUpperCase(); // 返回大写格式
}
// 将十六进制字符串转换为 byte
private static byte hexStringToByte(String hex) {
return (byte) Integer.parseInt(hex, 16);
}
}
然后使用网络根据助手请求。

以上就是使用java项目搭建一个netty服务的详细内容,更多关于java搭建netty服务的资料请关注脚本之家其它相关文章!
