Netty搭建WebSocket服务器实战教程
作者:别怕我只是一只羊~
这篇文章主要介绍了Netty搭建WebSocket服务器实战,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
项目结构:
引入jar包:
<!-- netty--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.77.Final</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
application.yml配置
netty: server: host: 127.0.0.1 port: 27001 use-epoll: false
配置类
@Configuration @ConfigurationProperties(prefix = ServerProperties.PREFIX) @Data public class ServerProperties { public static final String PREFIX = "netty.server"; /** * 服务器ip */ private String ip; /** * 服务器端口 */ private Integer port; /** * 传输模式linux上开启会有更高的性能 */ private boolean useEpoll; }
业务类:
@Data @Accessors(chain = true) public class Policy implements Serializable { private static final long serialVersionUID = 6816331623389002880L; private Integer fileLevel; private Integer validTime; }
/** * 服务端 到 客户端 */ @Data @Accessors(chain = true) public class RequestDTO implements Serializable { private static final long serialVersionUID = 4284674560985442616L; /** * 标识 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private String request; @JsonInclude(JsonInclude.Include.NON_EMPTY) private String response; /** * 认证码 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private String key; /** * 返回结果 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private String result; /** * 状态码 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private Integer status; /** * 人员列表 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private List<Person> persons; /** * 发送该命令的账户名 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private String handler; /** * 发送该命令的账户ID */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private Integer handlerId; /** * 舱门下标 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private Integer index; /** * 单位 编号 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private Integer departmentId; /** * 单位名称 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private String departmentName; /** * 修改柜门参数 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private List<CabinetInfoVO> data; /** * 人员对应的开门命令 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private List<Command> command; /** * 根据设备编号查询设备信息 */ // 设备编号 @JsonInclude(JsonInclude.Include.NON_EMPTY) private Integer deviceId; // 设备编号 @JsonInclude(JsonInclude.Include.NON_EMPTY) private String deviceNumber; @JsonInclude(JsonInclude.Include.NON_EMPTY) private Integer personId; // 人员编号 @JsonInclude(JsonInclude.Include.NON_EMPTY) private String personNumber; // 人员姓名 @JsonInclude(JsonInclude.Include.NON_EMPTY) private String personName; // 所属单位 ID // 所属单位名称 // 文件列表 private String[] fileNames; /** * 删除过期文件 */ private Long fileId; private String fileName; private String localPath; }
/** * 客户端 到 服务端 */ @Data @Accessors(chain = true) public class ResponseDTO implements Serializable { private static final long serialVersionUID = -7674457360121804081L; /** * 标识 */ private String request; private String response; /** * 认证结果值 */ private Integer value; /** * 响应状态码 */ private Integer status; private Integer index; /** * 客户端ID */ private Long id; /** * 客户端IP地址 */ private String ip; /** * 客户端版本号 */ private String version; /** * 装备柜行数 */ private Integer rows; /** * 装备柜列数 */ private Integer cols; /** * 各柜门状态 */ private String lockStatus; /** * 柜内物品名称 */ private String boxName; /** * 日志内容 */ private String content; /** * 操作类型 * 1.人员主动解锁 * 2.管理员利用账号权限强制解锁 */ private Integer handlerType; /** * type 为1时表示人员编号,为2时表示管理员账号用户名 */ private String handler; /** * 命令所属人员 */ private Integer personId; /** * 命令主键编号 */ private Integer commandId; /** * 设备编号 */ private String deviceNumber; /** * 文件列表 */ private String[] fileNames; /** * insertFile */ private Integer deviceId; private String fileName; private String size; private Integer type; private String createTime; private String copyTime; /** * 设备编号(缺省) */ // 人员编号 private String personNumber; private String personName; private Integer departmentId; private String departmentName; private Integer duration; private Integer width; private Integer height; private Boolean autoImportant; private String localPath; private String devicePath; private Integer collectionId; /** * 删除文件 */ private Long fileId; private List<Policy> policy; public String getRequest() { if (ObjectUtils.isEmpty(request)) { return null; } return request; } public String getResponse() { if (ObjectUtils.isEmpty(response)) { return null; } return response; } public Integer getValue() { if (ObjectUtils.isEmpty(value)) { return null; } return value; } public Integer getStatus() { if (ObjectUtils.isEmpty(status)) { return null; } return status; } public Long getId() { if (ObjectUtils.isEmpty(id)) { return null; } return id; } public String getIp() { if (ObjectUtils.isEmpty(ip)) { return null; } return ip; } public String getVersion() { if (ObjectUtils.isEmpty(version)) { return null; } return version; } public Integer getRows() { if (ObjectUtils.isEmpty(rows)) { return null; } return rows; } public Integer getCols() { if (ObjectUtils.isEmpty(cols)) { return null; } return cols; } public String getLockStatus() { if (ObjectUtils.isEmpty(lockStatus)) { return null; } return lockStatus; } public String getContent() { if (ObjectUtils.isEmpty(content)) { return null; } return content; } public Integer getHandlerType() { if (ObjectUtils.isEmpty(handlerType)) { return null; } return handlerType; } public String getHandler() { if (ObjectUtils.isEmpty(handler)) { return null; } return handler; } public String getBoxName() { if (ObjectUtils.isEmpty(boxName)) { return null; } return boxName; } public Integer getPersonId() { if (ObjectUtils.isEmpty(personId)) { return null; } return personId; } public Integer getCommandId() { if (ObjectUtils.isEmpty(commandId)) { return null; } return commandId; } public String getDeviceNumber() { if (ObjectUtils.isEmpty(deviceNumber)) { return null; } return deviceNumber; } public String[] getFileNames() { if (ObjectUtils.isEmpty(fileNames) || fileNames.length == 0) { return null; } return fileNames; } public String getFileName() { if (ObjectUtils.isEmpty(fileName)) { return null; } return fileName; } public String getSize() { if (ObjectUtils.isEmpty(size)) { return null; } return size; } public Integer getType() { if (ObjectUtils.isEmpty(type)) { return null; } return type; } public String getCreateTime() { if (ObjectUtils.isEmpty(createTime)) { return null; } return createTime; } public String getCopyTime() { if (ObjectUtils.isEmpty(copyTime)) { return null; } return copyTime; } public String getPersonNumber() { if (ObjectUtils.isEmpty(personNumber)) { return null; } return personNumber; } public String getPersonName() { if (ObjectUtils.isEmpty(personName)) { return null; } return personName; } public Integer getDepartmentId() { if (ObjectUtils.isEmpty(departmentId)) { return null; } return departmentId; } public String getDepartmentName() { if (ObjectUtils.isEmpty(departmentName)) { return null; } return departmentName; } public Integer getDuration() { if (ObjectUtils.isEmpty(duration)) { return null; } return duration; } public Integer getWidth() { if (ObjectUtils.isEmpty(width)) { return null; } return width; } public Integer getHeight() { if (ObjectUtils.isEmpty(height)) { return null; } return height; } public Boolean isAutoImportant() { if (ObjectUtils.isEmpty(autoImportant)) { return null; } return autoImportant; } public String getLocalPath() { if (ObjectUtils.isEmpty(localPath)) { return null; } return localPath; } public String getDevicePath() { if (ObjectUtils.isEmpty(devicePath)) { return null; } return devicePath; } public Integer getCollectionId() { if (ObjectUtils.isEmpty(collectionId)) { return null; } return collectionId; } public Boolean getAutoImportant() { return autoImportant; } }
客户端信息类:
@Data @Accessors(chain = true) public class SocketSession implements Serializable { private static final long serialVersionUID = 7585070255615177561L; private Channel channel; private long cabinetId; //智能柜唯一ID private int rows; private int cols; private Map<Integer, Boolean> lockStatusMap;//智能柜门锁状态 private Map<Integer, String> boxNameMap; //智能柜柜内物品名称 private long authenticationKey; //socket连接认证私有密钥 private long lastHeartbeatTime; //最后一次心跳时间 }
管理WebSocket握手会话
/** * 管理webSocket 握手会话 */ public class WebSocketSession { private final static HashMap<ChannelId, WebSocketServerHandshaker> CHANNEL_SHAKER = new HashMap<>(); /** * 添加 */ public static void setChannelShaker(ChannelId channelId, WebSocketServerHandshaker handShaker) { CHANNEL_SHAKER.put(channelId, handShaker); } /** * 获取 */ public static WebSocketServerHandshaker getChannelShaker(ChannelId channelId) { return CHANNEL_SHAKER.get(channelId); } /** * 释放 */ public static void clear(ChannelId channelId) { CHANNEL_SHAKER.remove(channelId); } }
客户端连接时认证方法
/** * 计算当前请求时间 * * @param key 时间戳 * @param authentication 根据key计算的结果 * @return */ public static boolean verify(long key, int authentication) { byte[] time_bytes = ByteBuffer.allocate(Long.BYTES / Byte.BYTES).putLong(key).array(); int count = 0; boolean even = time_bytes[time_bytes.length - 1] % 2 == 0; for (int i = 0; i < time_bytes.length; i++) { int val = time_bytes[i] & 0xFF; if (even) { count += (val << 1); } else { count += (val << 2); } } return count == authentication; }
客户端发送请求的所有操作工具类:
@Component @Slf4j public class OperateUtil { private final CollectionService collectionService; private final CollectionMapper collectionMapper; private final PersonMapper personMapper; private final LocklogService locklogService; private final CommandMapper commandMapper; private final DepartmentMapper departmentMapper; private final DeviceMapper deviceMapper; private final FileMapper fileMapper; private final AllProperties allProperties; private final ServerMapper serverMapper; public OperateUtil( CollectionService collectionService, PersonMapper personMapper, LocklogService locklogService, CollectionMapper collectionMapper, CommandMapper commandMapper, DepartmentMapper departmentMapper, DeviceMapper deviceMapper, FileMapper fileMapper, AllProperties allProperties, ServerMapper serverMapper ) { this.collectionService = collectionService; this.personMapper = personMapper; this.locklogService = locklogService; this.collectionMapper = collectionMapper; this.commandMapper = commandMapper; this.departmentMapper = departmentMapper; this.deviceMapper = deviceMapper; this.fileMapper = fileMapper; this.allProperties = allProperties; this.serverMapper = serverMapper; } public String ManyOperate(ResponseDTO responseDTO, ChannelHandlerContext ctx) { log.info("请求对象:{}", responseDTO); String msgHandler = "请求标识为空"; if (!ObjectUtils.isEmpty(responseDTO) && !ObjectUtils.isEmpty(responseDTO.getRequest())) { // 客户端请求,服务端响应 return msgHandler(responseDTO, responseDTO.getRequest(), ctx); } if (!ObjectUtils.isEmpty(responseDTO) && !ObjectUtils.isEmpty(responseDTO.getResponse())) { // 服务端请求,客户端响应 return msgHandler(responseDTO, responseDTO.getResponse(), ctx); } return msgHandler; } /** * 操作分发 * @param responseDTO * @param msg * @return */ private String msgHandler(ResponseDTO responseDTO, String msg, ChannelHandlerContext ctx) { switch (msg) { case "authenticate": return checkAuthenticate(responseDTO, ctx); case "login": return getCollectionInfo(responseDTO, ctx); case "heartbeat": return getHeartbeat(ctx); case "lockStatus": getLockStatus(responseDTO, ctx); return ""; case "personInfo": return getPersonInfo(responseDTO); case "unlockLog": return addLockLog(responseDTO); case "authenticationResult": return null; case "getOpenDoorCommand": return getDoorTask(responseDTO); case "completeOpenDoorCommand": return reciveDoor(responseDTO); case "deviceInfo": return getDvinfoByDvno(responseDTO); case "fileExist": return getFileExist(responseDTO); case "insertFile": return saveFile(responseDTO); case "getOverdueFile": return getOverdueFile(responseDTO); case "deletedOverdueFile": return deleteOverdueFile(responseDTO); default: return null; } } /** * 删除过期文件 * @param responseDTO * @return */ private String deleteOverdueFile(ResponseDTO responseDTO) { RequestDTO requestDTO = new RequestDTO(); if (ObjectUtils.isEmpty(responseDTO)) { log.info("删除文件参数为空"); requestDTO.setResponse("deletedOverdueFile") .setStatus(400); return JSON.toJSONString(requestDTO); } Boolean deleteFile = fileMapper.deleteFile(responseDTO.getFileId()); if (deleteFile) { requestDTO.setResponse("deletedOverdueFile") .setStatus(200); return JSON.toJSONString(requestDTO); } requestDTO.setResponse("deletedOverdueFile") .setStatus(201); return JSON.toJSONString(requestDTO); } /** * 查询过期文件 * @param responseDTO * @return */ private String getOverdueFile(ResponseDTO responseDTO) { RequestDTO requestDTO = new RequestDTO(); if (ObjectUtils.isEmpty(responseDTO) || ObjectUtils.isEmpty(responseDTO.getCollectionId())) { log.info("查询过期参数为空"); return null; } // 当前时间的实例 Calendar now = Calendar.getInstance(); now.setTime(new Date()); for (Policy policy : responseDTO.getPolicy()) { // 过期时间 now.set(Calendar.DATE, now.get(Calendar.DATE) - policy.getValidTime()); // 查询过期文件 File overdueFile = fileMapper.getOverdueFile(policy.getFileLevel(), now.getTime()); if (!ObjectUtils.isEmpty(overdueFile)) { // 查询到文件,填充返回,中断循环 requestDTO.setFileId(overdueFile.getFileId()) .setFileName(overdueFile.getFileName()) .setLocalPath(overdueFile.getFileLocalPath()); requestDTO.setResponse("getOverdueFile") .setStatus(200); return JSON.toJSONString(requestDTO); } } requestDTO.setResponse("getOverdueFile") .setStatus(201); return JSON.toJSONString(requestDTO); } /** * 存储文件 * @param responseDTO * @return */ private String saveFile(ResponseDTO responseDTO) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); RequestDTO requestDTO = new RequestDTO(); if (ObjectUtils.isEmpty(responseDTO)) { log.info("上报文件参数为空"); return null; } Server serverBySerialCode = serverMapper.getServerBySerialCode(allProperties.getSerialCode()); File file = new File(); Optional<ResponseDTO> optionalResponseDTO = Optional.ofNullable(responseDTO); file.setFileName(optionalResponseDTO.map(ResponseDTO::getFileName).orElseThrow()) .setFileSize(Long.parseLong(responseDTO.getSize())) .setFileType(responseDTO.getType()) .setFileDeviceId(responseDTO.getDeviceId()) .setFileDeviceNumber(responseDTO.getDeviceNumber()) .setFilePersonNumber(responseDTO.getPersonNumber()) .setFilePersonId(responseDTO.getPersonId()) .setFilePersonName(responseDTO.getPersonName()) .setFileDepartmentId(responseDTO.getDepartmentId()) .setFileDepartmentName(responseDTO.getDepartmentName()) .setFileDuration(responseDTO.getDuration()) .setFileWidth(responseDTO.getWidth()) .setFileHeight(responseDTO.getHeight()) .setFileAutoImportant(!responseDTO.getAutoImportant() ? 0 : 1) .setFileLocalPath(responseDTO.getLocalPath()) .setFileDevicePath(responseDTO.getDevicePath()) .setFileCollectionId(responseDTO.getCollectionId()) .setFileNeedUpload(1) .setFileManualImportant(0) .setFileServerId(Optional.ofNullable(serverBySerialCode).map(Server::getServerId).orElse(0)) .setFileDeleteCollection(0) .setFileDeleteServer(0) .setFileLock(0); try { file.setFileCreateTime(sdf.parse(responseDTO.getCreateTime())) .setFileCopyTime(sdf.parse(responseDTO.getCopyTime())); } catch (ParseException e) { throw new RuntimeException(e); } boolean insertFile = fileMapper.insertFile(file); if (insertFile) { requestDTO.setResponse("insertFile") .setStatus(200) .setIndex(responseDTO.getIndex()); return JSON.toJSONString(requestDTO); } requestDTO.setResponse("insertFile") .setStatus(-1) .setIndex(responseDTO.getIndex()); return JSON.toJSONString(requestDTO); } /** * 查询文件是否存在 * @param responseDTO * @return */ private String getFileExist(ResponseDTO responseDTO) { RequestDTO requestDTO = new RequestDTO(); if (ObjectUtils.isEmpty(responseDTO) || ObjectUtils.isEmpty(responseDTO.getFileNames())) { log.info("文件名称列表 参数为空"); return null; } // 返回给客户端的文件名集合 List<String> result = new ArrayList<>(); // 逐一检查文件名 for (String fileName : responseDTO.getFileNames()) { File fileByName = fileMapper.getFileByName(fileName); if (ObjectUtils.isEmpty(fileByName)) { // 未查询到数据,将文件名返回给客户端 result.add(fileName); } } String[] file = result.toArray(new String[result.size()]); requestDTO.setResponse("fileExist") .setFileNames(file) .setIndex(responseDTO.getIndex()); return JSONObject.toJSONString(requestDTO); } private String getDvinfoByDvno(ResponseDTO responseDTO) { RequestDTO requestDTO = new RequestDTO(); if (ObjectUtils.isEmpty(responseDTO) || ObjectUtils.isEmpty(responseDTO.getDeviceNumber())) { log.info("设备编号 参数为空"); return null; } // 查询数据库 Device deviceByDeviceNum = deviceMapper.getDeviceByDeviceNum(responseDTO.getDeviceNumber()); if (ObjectUtils.isEmpty(deviceByDeviceNum)) { requestDTO.setResponse("deviceInfo") .setStatus(201); return JSONObject.toJSONString(requestDTO); } Optional<Device> optionalDevice = Optional.ofNullable(deviceByDeviceNum); // 拼接参数 requestDTO.setResponse("deviceInfo") .setStatus(200) .setIndex(responseDTO.getIndex()) .setDeviceId(optionalDevice.map(Device::getDeviceId).orElse(0)) .setDeviceNumber(optionalDevice.map(Device::getDeviceNumber).orElse("")) .setPersonId(optionalDevice.map(Device::getPerson).map(Person::getPersonId).orElse(0)) .setPersonNumber( optionalDevice.map(Device::getPerson).map(Person::getPersonNumber).orElse("")) .setPersonName(optionalDevice.map(Device::getPerson).map(Person::getPersonName).orElse("")) .setDepartmentId(optionalDevice.map(Device::getDepartment).map(Department::getDepartmentId) .orElse(0)) .setDepartmentName( optionalDevice.map(Device::getDepartment).map(Department::getDepartmentName) .orElse("")); return JSONObject.toJSONString(requestDTO); } /** * 请求认证方法 * @param responseDTO * @return */ private String checkAuthenticate(ResponseDTO responseDTO, ChannelHandlerContext ctx) { RequestDTO requestDTO = new RequestDTO(); assert responseDTO != null; if (ObjectUtils.isEmpty(responseDTO.getValue())) { log.error("value值为空"); return null; } for (SocketSession socketSession : WebsocketMessageHandler.AllSocket) { if (ctx.channel() == socketSession.getChannel()) { long key = socketSession.getAuthenticationKey(); boolean verify = MyEncrypt.verify(key, responseDTO.getValue()); if (verify) { log.info("认证成功,返回给客户端"); requestDTO.setRequest("authenticationResult") .setResult("OK"); return JSONObject.toJSONString(requestDTO); } } } log.info("认证失败,返回给客户端"); requestDTO.setRequest("authenticationResult") .setResult("认证失败"); return JSONObject.toJSONString(requestDTO); } /** * 上报采集站信息 * @param responseDTO * @return */ private String getCollectionInfo(ResponseDTO responseDTO, ChannelHandlerContext ctx) { log.info("收到登录请求,装备柜信息:{}", responseDTO); RequestDTO requestDTO = new RequestDTO(); // 工作站 CollectionDTO collectionDTO = new CollectionDTO(); if (ObjectUtils.isEmpty(responseDTO) || ObjectUtils.isEmpty( responseDTO.getId()) || ObjectUtils.isEmpty(responseDTO.getLockStatus())) { log.info("由于请求内容是空的 或者 装备柜ID 或者 锁状态 是空的---处理请求结束"); return null; } // 将各柜门状态缓存下来 getStatus(responseDTO, ctx); // 先根据 ID 查询采集站是否 存在 // 如果 采集站 存在 ,则 更新采集站信息,否则 添加采集站 // 工作站ID collectionDTO.setCollectionId(responseDTO.getId().toString()); // 查询到采集站信息 List<Collection> collectionList = collectionMapper.getCollection( new Collection().setCollectionId(responseDTO.getId())); // 先将收到的信息set 进去 collectionDTO.setCollectionIp(responseDTO.getIp()) .setCollectionVersion(responseDTO.getVersion()); if (collectionList.size() > 0) { // 存在采集站,则更新 log.info("list:{}", collectionList); CommonResult commonResult = collectionService.updateCollection(collectionDTO); if (commonResult.getCode() == 200) { // 更新成功 if (!ObjectUtils.isEmpty(collectionList.get(0).getDepartment())) { // 如果采集站对应单位信息不为空 requestDTO.setDepartmentId(collectionList.get(0).getDepartment().getDepartmentId()) .setDepartmentName(collectionList.get(0).getDepartment().getDepartmentName()); } requestDTO.setResponse("login") .setStatus(200); return JSONObject.toJSONString(requestDTO); } } // 不存在采集站,则添加 CommonResult commonResult = null; try { collectionDTO.setCollectionDepartmentId("0"); commonResult = collectionService.addCollection(collectionDTO); } catch (ParseException e) { throw new RuntimeException(e); } if (commonResult.getCode() == 200) { requestDTO.setResponse("login") .setStatus(200); return JSONObject.toJSONString(requestDTO); } requestDTO.setResponse("login") .setStatus(-1); return JSONObject.toJSONString(requestDTO); } /** * 心跳请求 * @return */ private String getHeartbeat(ChannelHandlerContext ctx) { log.info("收到心跳请求:{}", ctx.channel().id()); HeartCache.getInstance().putValue(ctx.channel(), System.currentTimeMillis(), 2 * 60); RequestDTO requestDTO = new RequestDTO(); requestDTO.setResponse("heartbeat") .setStatus(200); return JSONObject.toJSONString(requestDTO); } /** * 获取各柜门状态 * @param responseDTO */ private void getLockStatus(ResponseDTO responseDTO, ChannelHandlerContext ctx) { // 获取各柜门状态 getStatus(responseDTO, ctx); } private Boolean strToBool(String status) { if ("1".equals(status)) { return true; } return false; } /** * 获取各柜门状态 * @param responseDTO * @param ctx */ private void getStatus(ResponseDTO responseDTO, ChannelHandlerContext ctx) { log.info("将 装备柜信息 缓存下来"); if (ObjectUtils.isEmpty(responseDTO.getLockStatus()) || ObjectUtils.isEmpty(responseDTO.getBoxName())) { log.info("由于 锁状态 或者 柜内物品名称 是空的---则只将 工作站ID 记录到 内存中"); for (SocketSession socketSession : WebsocketMessageHandler.AllSocket) { if (socketSession.getChannel() == ctx.channel()) { socketSession.setCabinetId(responseDTO.getId()); } break; } return; } // 状态数组 String[] status = responseDTO.getLockStatus().split(",", -1); log.info("锁状态数组长度:{}", status.length); // 柜内物品名称 String[] name = responseDTO.getBoxName().split(",", -1); log.info("柜内物品名称数组长度:{}", name.length); // 先找到 此通道 对应的socket for (SocketSession socketSession : WebsocketMessageHandler.AllSocket) { if (socketSession.getChannel() == ctx.channel()) { // 锁状态集合 Map<Integer, Boolean> statusMap = new HashMap<>(); // 物品名称集合 Map<Integer, String> boxMap = new HashMap<>(); for (int i = 0; i < status.length; i++) { statusMap.put(i + 1, strToBool(status[i])); boxMap.put(i + 1, name[i]); } if (!ObjectUtils.isEmpty(responseDTO.getId())) { socketSession.setCabinetId(responseDTO.getId()); } socketSession.setRows(responseDTO.getRows()) .setCols(responseDTO.getCols()) .setLockStatusMap(statusMap) .setBoxNameMap(boxMap); break; } } } /** * 获取人员信息 * @param responseDTO * @return */ private String getPersonInfo(ResponseDTO responseDTO) { RequestDTO requestDTO = new RequestDTO(); List<Person> allPerson = personMapper.getAllPerson(); requestDTO.setResponse("personInfo") .setStatus(200) .setPersons(allPerson); log.info("人员对象:{}", requestDTO); log.info("人员对象转JSON:{}", JSONObject.toJSONString(requestDTO)); return JSONObject.toJSONString(requestDTO); } /** * 开锁日志 * @param responseDTO * @return */ private String addLockLog(ResponseDTO responseDTO) { RequestDTO requestDTO = new RequestDTO(); if (!ObjectUtils.isEmpty(responseDTO) && !ObjectUtils.isEmpty(responseDTO.getHandlerType()) && !ObjectUtils.isEmpty(responseDTO.getHandler()) && !ObjectUtils.isEmpty(responseDTO.getContent())) { LocklogDTO locklogDTO = new LocklogDTO(); locklogDTO.setLocklogHandlerType(responseDTO.getHandlerType().toString()) .setLocklogHandler(responseDTO.getHandler()) .setLocklogContent(responseDTO.getContent()); CommonResult commonResult = locklogService.addLockLog(locklogDTO); if (commonResult.getCode() == 200) { requestDTO.setResponse("unlockLog") .setStatus(200); return JSONObject.toJSONString(requestDTO); } } log.info("开锁日志为空 或者 操作类型为空 或者 操作人员为空 或者 操作内容为空 -- 处理结束"); requestDTO.setResponse("unlockLog") .setStatus(-1); return JSONObject.toJSONString(requestDTO); } /** * 人员开门查询 * @param responseDTO * @return */ private String getDoorTask(ResponseDTO responseDTO) { if (ObjectUtils.isEmpty(responseDTO.getPersonId())) { log.info("人员编号为空---开门任务处理结束"); return null; } RequestDTO requestDTO = new RequestDTO(); Command command = new Command(); command.setCommandPersonId(responseDTO.getPersonId()); List<Command> commandList = commandMapper.getCommandByPersonId(responseDTO.getPersonId(), new Date()); if (commandList.size() > 0) { requestDTO.setResponse("getOpenDoorCommand") .setStatus(200) .setCommand(commandList); return JSONObject.toJSONString(requestDTO); } requestDTO.setResponse("getOpenDoorCommand") .setStatus(200); return JSONObject.toJSONString(requestDTO); } private String reciveDoor(ResponseDTO responseDTO) { if (ObjectUtils.isEmpty(responseDTO.getCommandId())) { log.info("命令主键编号为空---结束处理"); return null; } RequestDTO requestDTO = new RequestDTO(); Command command = new Command(); command.setCommandId(responseDTO.getCommandId()) .setCommandComplete(true) .setCommandCompleteTime(new Date()); Integer updateCommand = commandMapper.updateCommand(command); if (updateCommand > 0) { requestDTO.setRequest("completeOpenDoorCommand") .setStatus(200); return JSONObject.toJSONString(requestDTO); } requestDTO.setRequest("completeOpenDoorCommand") .setStatus(-1); return JSONObject.toJSONString(requestDTO); } }
心跳
public class CacheEntity implements Serializable { private static final long serialVersionUID = 3055325810872798183L; private Object value; /** * 保存的时间戳 */ private long gmtModify; /** * 过期时间 */ private int expire; public Object getValue() { if (ObjectUtils.isEmpty(value)) { return null; } return value; } public void setValue(Object value) { this.value = value; } public long getGmtModify() { return gmtModify; } public void setGmtModify(long gmtModify) { this.gmtModify = gmtModify; } public int getExpire() { return expire; } public void setExpire(int expire) { this.expire = expire; } public CacheEntity(Object value, long gmtModify, int expire) { super(); this.value = value; this.gmtModify = gmtModify; this.expire = expire; } }
@Slf4j public class HeartCache { private static final int DEFAULT_CAPACITY = 512; /** * 最大容量 */ private static final int MAX_CAPACITY = 100000; /** * 刷新缓存的频率 */ private static final int MONITOR_DURATION = 2; // 启动监控线程 static { new Thread(new TimeoutTimerThread()).start(); } // 内部类方式实现单例 private static class HeartCacheInstance { private static final HeartCache INSTANCE = new HeartCache(); } public static HeartCache getInstance() { return HeartCache.HeartCacheInstance.INSTANCE; } private HeartCache() { } /** * 使用默认容量创建一个Map */ private static Map<Channel, CacheEntity> heartExpire = new ConcurrentHashMap<>(DEFAULT_CAPACITY); /** * 将key-value保存到本地缓存并制定该缓存的过期时间 * @param key * @param value * @param expireTime 过期时间,如果是-1 则表示永不过期 * @return * @param <T> */ public <T> boolean putValue(Channel key, T value, int expireTime) { return putCloneValue(key, value, expireTime); } /** * 将值通过序列化clone 处理后保存到缓存中,可以解决值引用的问题 * @param key * @param value * @param expireTime * @return * @param <T> */ private <T> boolean putCloneValue(Channel key, T value, int expireTime) { try { if (heartExpire.size() >= MAX_CAPACITY) { return false; } // 序列化赋值 CacheEntity entityClone = clone(new CacheEntity(value, System.nanoTime(), expireTime)); heartExpire.put(key, entityClone); return true; } catch (Exception e) { log.error("添加缓存失败:{}", e.getMessage()); } return false; } /** * 序列化 克隆处理 * @param object * @return * @param <E> */ private <E extends Serializable> E clone(E object) { E cloneObject = null; try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(object); oos.close(); ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); ObjectInputStream ois = new ObjectInputStream(bais); cloneObject = (E) ois.readObject(); ois.close(); } catch (Exception e) { log.error("缓存序列化失败:{}", e.getMessage()); } return cloneObject; } /** * 从本地缓存中获取key对应的值,如果该值不存则则返回null * @param key * @return */ public Object getValue(Channel key) { if (CollectionUtils.isEmpty(heartExpire)) { return null; } CacheEntity cacheEntity = heartExpire.get(key); if (ObjectUtils.isEmpty(cacheEntity)) { return null; } return cacheEntity.getValue(); } public void remove(Channel key) { if (CollectionUtils.isEmpty(heartExpire)) { return; } CacheEntity cacheEntity = heartExpire.get(key); if (ObjectUtils.isEmpty(cacheEntity)) { return; } heartExpire.remove(key); } public Integer count() { return heartExpire.size(); } /** * 清空所有 */ public void clear() { heartExpire.clear(); } /** * 过期处理线程 */ static class TimeoutTimerThread implements Runnable { @Override public void run() { while (true) { try { TimeUnit.SECONDS.sleep(MONITOR_DURATION); checkTime(); } catch (Exception e) { log.error("过期缓存清理失败:{}", e.getMessage()); } } } /** * 过期缓存的具体处理方法 * * @throws Exception */ private void checkTime() throws Exception { // 开始处理过期 for (Channel key : heartExpire.keySet()) { CacheEntity tce = heartExpire.get(key); long timoutTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - tce.getGmtModify()); // 过期时间 : timoutTime if (tce.getExpire() > timoutTime) { continue; } log.info(" 清除过期缓存 :{}", key); //清除过期缓存和删除对应的缓存队列 heartExpire.remove(key); log.info("断开客户端连接:{}", key.id()); key.disconnect(); } } } }
消息处理类:
/** * 消息处理,单例启动 * * @author qiding */ @Slf4j @Component @ChannelHandler.Sharable @RequiredArgsConstructor public class MessageHandler extends SimpleChannelInboundHandler<Object> { private final WebsocketMessageHandler websocketHandler; @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { log.info("FullHttpRequest"); FullHttpRequest request = (FullHttpRequest) msg; //处理握手数据 // 首次握手进行校验 isFullHttpRequest(ctx, request); // 获取请求uri String uri = request.uri(); // 参数分别是 (ws地址,子协议,是否扩展,最大frame长度) WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory( getWebSocketLocation(request), null, true, 5 * 1024 * 1024); WebSocketServerHandshaker handShaker = factory.newHandshaker(request); if (handShaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { handShaker.handshake(ctx.channel(), request); } WebSocketSession.setChannelShaker(ctx.channel().id(), handShaker); //握手成功 连接建立完成 websocketHandler.online(ctx); } else if (msg instanceof PingWebSocketFrame) { log.info("PingWebSocketFrame"); // 处理握手PING/PONG PingWebSocketFrame pingWebSocketFrame = (PingWebSocketFrame) msg; ctx.writeAndFlush(new PongWebSocketFrame(pingWebSocketFrame.content().retain())); } else if (msg instanceof TextWebSocketFrame) { log.info("TextWebSocketFrame"); //处理websocket数据(字符串) websocketHandler.receivedMessage(ctx, (TextWebSocketFrame) msg); } } //客户端掉线 @Override public void channelInactive(ChannelHandlerContext ctx) { log.info("断开连接"); /* // 释放缓存 ChannelStore.closeAndClean(ctx); // 断开连接,删除 map GlobalUtil.SESSIONMAP.remove(ctx.channel().id()); WebSocketSession.clear(ctx.channel().id()); */ websocketHandler.offline(ctx); } //新的客户端 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("成功建立连接,channelId:{}", ctx.channel().id()); super.channelActive(ctx); } //以下是工具类方法 不做具体数据处理 /** * 判断是否是正确的websocket 握手协议 */ private static void isFullHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) { if (!request.decoderResult().isSuccess()) { log.error("非webSocket请求"); sendResponse(ctx, request, new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.BAD_REQUEST, ctx.alloc().buffer())); ctx.close(); return; } if (!HttpMethod.GET.equals(request.method())) { log.error("非GET请求"); sendResponse(ctx, request, new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.FORBIDDEN, ctx.alloc().buffer())); ctx.close(); } } /** * SSL支持采用wss: */ private static String getWebSocketLocation(FullHttpRequest request) { return "ws://" + request.headers().get(HttpHeaderNames.HOST) + "/ws"; } private static void sendResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse resp) { HttpResponseStatus status = resp.status(); if (status != HttpResponseStatus.OK) { ByteBufUtil.writeUtf8(resp.content(), status.toString()); HttpUtil.setContentLength(req, resp.content().readableBytes()); } boolean keepAlive = HttpUtil.isKeepAlive(req) && status == HttpResponseStatus.OK; HttpUtil.setKeepAlive(req, keepAlive); ChannelFuture future = ctx.write(resp); if (!keepAlive) { future.addListener(ChannelFutureListener.CLOSE); } } }
/** * Websocket 消息处理器 * * @author qiding */ @Slf4j @Component public class WebsocketMessageHandler { private final OperateUtil operateUtil; public WebsocketMessageHandler(OperateUtil operateUtil) { this.operateUtil = operateUtil; } /** * 存储所有在线的连接 */ public static final List<SocketSession> AllSocket = new ArrayList<>(); public void online(ChannelHandlerContext ctx) { //1. 立马发送验证请求 告知终端需要验证 long key = System.currentTimeMillis(); String jsonRequest = "{\"request\":\"authenticate\",\"key\":\"" + key + "\"}"; //new一个缓存对象 保存该连接的信息 SocketSession socketSession = new SocketSession(); socketSession.setAuthenticationKey(key) .setLastHeartbeatTime(System.currentTimeMillis()) .setChannel(ctx.channel()); AllSocket.add(socketSession); // 有socket连接,将连接存入 redis HeartCache.getInstance().putValue(ctx.channel(), System.currentTimeMillis(), 2 * 60); log.info("心跳总共连接数:{}", HeartCache.getInstance().count()); //send TextWebSocketFrame frame = new TextWebSocketFrame(jsonRequest); ctx.writeAndFlush(frame); } public void offline(ChannelHandlerContext ctx) { // 从心跳集合中移除 该连接 HeartCache.getInstance().remove(ctx.channel()); log.info("剩余心跳连接数:{}", HeartCache.getInstance().count()); //1. 从在线列表中移除该连接 for (int i = 0; i < AllSocket.size(); i++) { SocketSession ca = AllSocket.get(i); if (ca.getChannel() == ctx.channel()) { AllSocket.remove(i); break; } } } public void receivedMessage(ChannelHandlerContext ctx, TextWebSocketFrame msg) { //1. 将websocket的字符串消息转换成Json对象 JSONObject jsonObject = JSONObject.parseObject(msg.text()); if (jsonObject.isEmpty()) { return; } if (msg.text().equals("1") || msg.text().equals("2")) { ctx.writeAndFlush(new TextWebSocketFrame(String.valueOf(HeartCache.getInstance().count()))); } log.info("接收到的消息:{}", msg.text()); ResponseDTO responseDTO = JSON.parseObject(msg.text(), ResponseDTO.class); String operateMsg = operateUtil.ManyOperate(responseDTO, ctx); if (!ObjectUtils.isEmpty(operateMsg)) { TextWebSocketFrame frame = new TextWebSocketFrame(operateMsg); ctx.writeAndFlush(frame); } } /** * 向某一个socket连接推送消息 */ public static void sendMsgToClient(long cabinetId, String msg) { //在AllSocket中找到cabinetId对应的连接,将该json消息推送出去 for (int i = 0; i < AllSocket.size(); i++) { SocketSession ca = AllSocket.get(i); if (ca.getCabinetId() == cabinetId) { ca.getChannel().writeAndFlush(new TextWebSocketFrame(msg)); break; } } } }
Netty频道初始化:
/** * Netty 通道初始化 * * @author qiding */ @Component @RequiredArgsConstructor public class ChannelInit extends ChannelInitializer<SocketChannel> { private final MessageHandler messageHandler; @Override protected void initChannel(SocketChannel channel) { channel.pipeline() // 心跳时间 // 对http协议的支持. .addLast(new HttpServerCodec()) // 对大数据流的支持 .addLast(new ChunkedWriteHandler()) // 聚合 Http 将多个requestLine、requestHeader、messageBody信息转化成单一的request或者response对象 .addLast(new HttpObjectAggregator(8192)) // 聚合 websocket 的数据帧,因为客户端可能分段向服务器端发送数据 .addLast(new WebSocketFrameAggregator(1024 * 62)) // 添加消息处理器 .addLast("messageHandler", messageHandler); } }
TCPServer
public interface ITcpServer { /** * 主启动程序,初始化参数 * * @throws Exception 初始化异常 */ void start() throws Exception; /** * 优雅的结束服务器 * * @throws InterruptedException 提前中断异常 */ @PreDestroy void destroy() throws InterruptedException; }
@Component @Slf4j @RequiredArgsConstructor public class TcpServer implements ITcpServer { private final ChannelInit channelInit; private final ServerProperties serverProperties; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; @Override public void start() throws Exception { log.info("初始化 TCP server ..."); bossGroup = serverProperties.isUseEpoll() ? new EpollEventLoopGroup() : new NioEventLoopGroup(); workerGroup = serverProperties.isUseEpoll() ? new EpollEventLoopGroup() : new NioEventLoopGroup(); this.tcpServer(); } /** * 初始化 */ private void tcpServer() { try { new ServerBootstrap() .group(bossGroup, workerGroup) .channel( serverProperties.isUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .localAddress(new InetSocketAddress(serverProperties.getPort())) // 配置 编码器、解码器、业务处理 .childHandler(channelInit) // tcp缓冲区 .option(ChannelOption.SO_BACKLOG, 128) // 将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。希望服务是低延迟的,建议将TCP_NODELAY设置为true .childOption(ChannelOption.TCP_NODELAY, false) // 保持长连接 .childOption(ChannelOption.SO_KEEPALIVE, true) // 绑定端口,开始接收进来的连接 .bind().sync(); log.info("websocket server启动成功!开始监听端口:{}", serverProperties.getPort()); } catch (Exception e) { e.printStackTrace(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } /** * 销毁 * @throws InterruptedException */ @PreDestroy @Override public void destroy() throws InterruptedException { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
启动项中调用启动:
@Override public void run(String... args) throws Exception { tcpServer.start(); }
到此这篇关于Netty搭建WebSocket服务器的文章就介绍到这了,更多相关Netty WebSocket服务器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!