java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringBoot TCP通信

基于SpringBoot实现多线程多主机TCP通信

作者:IT界Tony哥

这篇文章主要为大家详细介绍了如何基于SpringBoot实现多线程多主机TCP通信,包括发送数据和接收应答并解析,感兴趣的小伙伴可以跟随小编一起学习一下

下面我将介绍如何使用 Spring Boot 实现多线程、多主机的 TCP 通信,包括发送数据和接收应答并解析。

1. 项目结构

src/main/java/com/example/tcpdemo/
├── config
│   └── TcpClientConfig.java
├── controller
│   └── TcpController.java
├── handler
│   ├── TcpClientHandler.java
│   └── TcpResponseHandler.java
├── model
│   └── TcpHost.java
├── service
│   ├── TcpClientService.java
│   └── impl
│       └── TcpClientServiceImpl.java
└── TcpDemoApplication.java

2. 核心代码实现

2.1 配置类

// TcpClientConfig.java
@Configuration
public class TcpClientConfig {
    
    @Value("${tcp.client.thread-pool-size:10}")
    private int threadPoolSize;
    
    @Bean
    public ExecutorService tcpClientExecutor() {
        return Executors.newFixedThreadPool(threadPoolSize);
    }
}

2.2 TCP主机模型

// TcpHost.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TcpHost {
    private String host;
    private int port;
    private String name; // 主机标识名称
}

2.3 TCP客户端处理器

// TcpClientHandler.java
@Component
public class TcpClientHandler {
    
    @Autowired
    private ExecutorService tcpClientExecutor;
    
    @Autowired
    private TcpResponseHandler responseHandler;
    
    public void sendToMultipleHosts(List<TcpHost> hosts, String message) {
        hosts.forEach(host -> {
            tcpClientExecutor.execute(() -> {
                try (Socket socket = new Socket(host.getHost(), host.getPort());
                     PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
                     BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
                    
                    // 发送数据
                    out.println(message);
                    System.out.println("Sent to " + host.getName() + ": " + message);
                    
                    // 接收响应
                    String response = in.readLine();
                    responseHandler.handleResponse(host, response);
                    
                } catch (IOException e) {
                    System.err.println("Error communicating with " + host.getName() + ": " + e.getMessage());
                }
            });
        });
    }
}

2.4 响应处理器

// TcpResponseHandler.java
@Component
public class TcpResponseHandler {
    
    public void handleResponse(TcpHost host, String response) {
        // 这里实现你的响应解析逻辑
        System.out.println("Received from " + host.getName() + ": " + response);
        
        // 示例解析逻辑
        if (response != null) {
            // 假设响应格式为 "status|data"
            String[] parts = response.split("\|");
            if (parts.length == 2) {
                String status = parts[0];
                String data = parts[1];
                System.out.println("Parsed response - Status: " + status + ", Data: " + data);
            }
        }
    }
}

2.5 TCP服务接口

// TcpClientService.java
public interface TcpClientService {
    void sendToHosts(List<TcpHost> hosts, String message);
}

2.6 TCP服务实现

// TcpClientServiceImpl.java
@Service
public class TcpClientServiceImpl implements TcpClientService {
    
    @Autowired
    private TcpClientHandler tcpClientHandler;
    
    @Override
    public void sendToHosts(List<TcpHost> hosts, String message) {
        tcpClientHandler.sendToMultipleHosts(hosts, message);
    }
}

2.7 控制器

// TcpController.java
@RestController
@RequestMapping("/api/tcp")
public class TcpController {
    
    @Autowired
    private TcpClientService tcpClientService;
    
    @PostMapping("/send")
    public ResponseEntity<String> sendMessage(@RequestBody Map<String, Object> request) {
        List<Map<String, Object>> hostsInfo = (List<Map<String, Object>>) request.get("hosts");
        String message = (String) request.get("message");
        
        List<TcpHost> hosts = hostsInfo.stream()
            .map(info -> new TcpHost(
                (String) info.get("host"),
                (Integer) info.get("port"),
                (String) info.get("name")))
            .collect(Collectors.toList());
        
        tcpClientService.sendToHosts(hosts, message);
        
        return ResponseEntity.ok("Messages sent successfully");
    }
}

3. 应用配置

application.propertiesapplication.yml中添加配置:

# TCP客户端线程池大小
tcp.client.thread-pool-size=20

# 服务器端口
server.port=8080

4. 使用示例

4.1 启动Spring Boot应用

// TcpDemoApplication.java
@SpringBootApplication
public class TcpDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(TcpDemoApplication.class, args);
    }
}

4.2 发送请求

使用Postman或curl发送POST请求:

curl -X POST http://localhost:8080/api/tcp/send \
-H "Content-Type: application/json" \
-d '{
    "hosts": [
        {"host": "127.0.0.1", "port": 12345, "name": "Server1"},
        {"host": "127.0.0.1", "port": 12346, "name": "Server2"}
    ],
    "message": "Hello TCP Server"
}'

5. 高级功能扩展

5.1 连接池管理

// 可以扩展为使用连接池管理TCP连接
@Component
public class TcpConnectionPool {
    private Map<String, Socket> connectionPool = new ConcurrentHashMap<>();
    
    public Socket getConnection(TcpHost host) throws IOException {
        String key = host.getHost() + ":" + host.getPort();
        if (!connectionPool.containsKey(key) || connectionPool.get(key).isClosed()) {
            connectionPool.put(key, new Socket(host.getHost(), host.getPort()));
        }
        return connectionPool.get(key);
    }
    
    public void closeAll() {
        connectionPool.values().forEach(socket -> {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        connectionPool.clear();
    }
}

5.2 超时设置

// 在TcpClientHandler中添加超时设置
public void sendToMultipleHosts(List<TcpHost> hosts, String message) {
    hosts.forEach(host -> {
        tcpClientExecutor.execute(() -> {
            try {
                Socket socket = new Socket();
                socket.connect(new InetSocketAddress(host.getHost(), host.getPort()), 5000); // 5秒连接超时
                socket.setSoTimeout(10000); // 10秒读取超时
                
                try (PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
                     BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
                    
                    out.println(message);
                    System.out.println("Sent to " + host.getName() + ": " + message);
                    
                    String response = in.readLine();
                    responseHandler.handleResponse(host, response);
                }
            } catch (SocketTimeoutException e) {
                System.err.println("Timeout when communicating with " + host.getName());
            } catch (IOException e) {
                System.err.println("Error communicating with " + host.getName() + ": " + e.getMessage());
            }
        });
    });
}

5.3 自定义协议解析

// 扩展TcpResponseHandler实现更复杂的协议解析
public void handleResponse(TcpHost host, String response) {
    try {
        // 示例:解析JSON格式响应
        JSONObject jsonResponse = new JSONObject(response);
        String status = jsonResponse.getString("status");
        String data = jsonResponse.getString("data");
        long timestamp = jsonResponse.getLong("timestamp");
        
        System.out.printf("Response from %s - Status: %s, Data: %s, Time: %tF %<tT%n",
            host.getName(), status, data, new Date(timestamp));
        
    } catch (JSONException e) {
        System.err.println("Invalid response format from " + host.getName());
    }
}

6. 注意事项

这个实现提供了基本的TCP多线程通信框架,你可以根据实际需求进行扩展和优化。

到此这篇关于基于SpringBoot实现多线程多主机TCP通信的文章就介绍到这了,更多相关SpringBoot TCP通信内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文