基于SpringBoot实现多线程多主机TCP通信
作者:IT界Tony哥
这篇文章主要为大家详细介绍了如何基于SpringBoot实现多线程多主机TCP通信,包括发送数据和接收应答并解析,感兴趣的小伙伴可以跟随小编一起学习一下
下面我将介绍如何使用 Spring Boot 实现多线程、多主机的 TCP 通信,包括发送数据和接收应答并解析。
1. 项目结构
src/main/java/com/example/tcpdemo/
├── config
│ └── TcpClientConfig.java
├── controller
│ └── TcpController.java
├── handler
│ ├── TcpClientHandler.java
│ └── TcpResponseHandler.java
├── model
│ └── TcpHost.java
├── service
│ ├── TcpClientService.java
│ └── impl
│ └── TcpClientServiceImpl.java
└── TcpDemoApplication.java
2. 核心代码实现
2.1 配置类
// TcpClientConfig.java @Configuration public class TcpClientConfig { @Value("${tcp.client.thread-pool-size:10}") private int threadPoolSize; @Bean public ExecutorService tcpClientExecutor() { return Executors.newFixedThreadPool(threadPoolSize); } }
2.2 TCP主机模型
// TcpHost.java @Data @AllArgsConstructor @NoArgsConstructor public class TcpHost { private String host; private int port; private String name; // 主机标识名称 }
2.3 TCP客户端处理器
// TcpClientHandler.java @Component public class TcpClientHandler { @Autowired private ExecutorService tcpClientExecutor; @Autowired private TcpResponseHandler responseHandler; public void sendToMultipleHosts(List<TcpHost> hosts, String message) { hosts.forEach(host -> { tcpClientExecutor.execute(() -> { try (Socket socket = new Socket(host.getHost(), host.getPort()); PrintWriter out = new PrintWriter(socket.getOutputStream(), true); BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()))) { // 发送数据 out.println(message); System.out.println("Sent to " + host.getName() + ": " + message); // 接收响应 String response = in.readLine(); responseHandler.handleResponse(host, response); } catch (IOException e) { System.err.println("Error communicating with " + host.getName() + ": " + e.getMessage()); } }); }); } }
2.4 响应处理器
// TcpResponseHandler.java @Component public class TcpResponseHandler { public void handleResponse(TcpHost host, String response) { // 这里实现你的响应解析逻辑 System.out.println("Received from " + host.getName() + ": " + response); // 示例解析逻辑 if (response != null) { // 假设响应格式为 "status|data" String[] parts = response.split("\|"); if (parts.length == 2) { String status = parts[0]; String data = parts[1]; System.out.println("Parsed response - Status: " + status + ", Data: " + data); } } } }
2.5 TCP服务接口
// TcpClientService.java public interface TcpClientService { void sendToHosts(List<TcpHost> hosts, String message); }
2.6 TCP服务实现
// TcpClientServiceImpl.java @Service public class TcpClientServiceImpl implements TcpClientService { @Autowired private TcpClientHandler tcpClientHandler; @Override public void sendToHosts(List<TcpHost> hosts, String message) { tcpClientHandler.sendToMultipleHosts(hosts, message); } }
2.7 控制器
// TcpController.java @RestController @RequestMapping("/api/tcp") public class TcpController { @Autowired private TcpClientService tcpClientService; @PostMapping("/send") public ResponseEntity<String> sendMessage(@RequestBody Map<String, Object> request) { List<Map<String, Object>> hostsInfo = (List<Map<String, Object>>) request.get("hosts"); String message = (String) request.get("message"); List<TcpHost> hosts = hostsInfo.stream() .map(info -> new TcpHost( (String) info.get("host"), (Integer) info.get("port"), (String) info.get("name"))) .collect(Collectors.toList()); tcpClientService.sendToHosts(hosts, message); return ResponseEntity.ok("Messages sent successfully"); } }
3. 应用配置
在 application.properties
或 application.yml
中添加配置:
# TCP客户端线程池大小 tcp.client.thread-pool-size=20 # 服务器端口 server.port=8080
4. 使用示例
4.1 启动Spring Boot应用
// TcpDemoApplication.java @SpringBootApplication public class TcpDemoApplication { public static void main(String[] args) { SpringApplication.run(TcpDemoApplication.class, args); } }
4.2 发送请求
使用Postman或curl发送POST请求:
curl -X POST http://localhost:8080/api/tcp/send \ -H "Content-Type: application/json" \ -d '{ "hosts": [ {"host": "127.0.0.1", "port": 12345, "name": "Server1"}, {"host": "127.0.0.1", "port": 12346, "name": "Server2"} ], "message": "Hello TCP Server" }'
5. 高级功能扩展
5.1 连接池管理
// 可以扩展为使用连接池管理TCP连接 @Component public class TcpConnectionPool { private Map<String, Socket> connectionPool = new ConcurrentHashMap<>(); public Socket getConnection(TcpHost host) throws IOException { String key = host.getHost() + ":" + host.getPort(); if (!connectionPool.containsKey(key) || connectionPool.get(key).isClosed()) { connectionPool.put(key, new Socket(host.getHost(), host.getPort())); } return connectionPool.get(key); } public void closeAll() { connectionPool.values().forEach(socket -> { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } }); connectionPool.clear(); } }
5.2 超时设置
// 在TcpClientHandler中添加超时设置 public void sendToMultipleHosts(List<TcpHost> hosts, String message) { hosts.forEach(host -> { tcpClientExecutor.execute(() -> { try { Socket socket = new Socket(); socket.connect(new InetSocketAddress(host.getHost(), host.getPort()), 5000); // 5秒连接超时 socket.setSoTimeout(10000); // 10秒读取超时 try (PrintWriter out = new PrintWriter(socket.getOutputStream(), true); BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()))) { out.println(message); System.out.println("Sent to " + host.getName() + ": " + message); String response = in.readLine(); responseHandler.handleResponse(host, response); } } catch (SocketTimeoutException e) { System.err.println("Timeout when communicating with " + host.getName()); } catch (IOException e) { System.err.println("Error communicating with " + host.getName() + ": " + e.getMessage()); } }); }); }
5.3 自定义协议解析
// 扩展TcpResponseHandler实现更复杂的协议解析 public void handleResponse(TcpHost host, String response) { try { // 示例:解析JSON格式响应 JSONObject jsonResponse = new JSONObject(response); String status = jsonResponse.getString("status"); String data = jsonResponse.getString("data"); long timestamp = jsonResponse.getLong("timestamp"); System.out.printf("Response from %s - Status: %s, Data: %s, Time: %tF %<tT%n", host.getName(), status, data, new Date(timestamp)); } catch (JSONException e) { System.err.println("Invalid response format from " + host.getName()); } }
6. 注意事项
- 线程安全:确保在多线程环境下共享资源的线程安全
- 资源释放:正确关闭Socket、流等资源
- 异常处理:合理处理各种网络异常
- 性能优化:根据实际需求调整线程池大小
- 日志记录:添加详细的日志记录以便排查问题
这个实现提供了基本的TCP多线程通信框架,你可以根据实际需求进行扩展和优化。
到此这篇关于基于SpringBoot实现多线程多主机TCP通信的文章就介绍到这了,更多相关SpringBoot TCP通信内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!