Flutter中实现TCP通信的关键步骤与代码示例
作者:Dabei
在移动端开发中,除了常见的 HTTP、MQTT 之外,很多场景需要直接使用 TCP 通信,例如局域网设备控制、实时传输等,本文将介绍在 Flutter/Dart 中实现一个 TCP 客户端的基本过程,并解析关键代码点,需要的朋友可以参考下
引言
在移动端开发中,除了常见的 HTTP、MQTT 之外,很多场景需要直接使用 TCP 通信,例如局域网设备控制、实时传输等。本文将介绍在 Flutter/Dart 中实现一个 TCP 客户端的基本过程,并解析关键代码点。文章同时给出 自动重连 与 心跳保活 的完整示例代码,便于直接落地。
1. 基本思路
- 使用
Socket.connect
建立连接 - 将
socket
转换为 流(Stream) 进行监听,实时接收消息 - 使用
LineSplitter
按行切分消息,避免 TCP 粘包/分包问题(前提:每条消息以\n
结尾,且内容不含换行) - 加入 超时/心跳 与 自动重连(指数退避 + 抖动)
2. 建立 TCP 连接(明文)
import 'dart:io'; import 'dart:async'; import 'dart:convert'; class TcpClient { final String host; final int port; Socket? _socket; StreamSubscription<String>? _subscription; TcpClient(this.host, this.port); Future<void> connect() async { try { _socket = await Socket.connect(host, port, timeout: const Duration(seconds: 5)); print('✅ Connected to: ${_socket!.remoteAddress.address}:${_socket!.remotePort}'); _subscription = _socket! .transform(utf8.decoder) .transform(const LineSplitter()) .listen(_onLine, onError: _onError, onDone: _onDone); } catch (e) { print('🚫 Connection failed: $e'); rethrow; } } void _onLine(String line) { print('📩 Received line: $line'); } void _onError(Object e, [StackTrace? st]) { print('❌ Socket error: $e'); disconnect(); } void _onDone() { print('⚠️ Server closed connection'); disconnect(); } void send(String message) { final s = _socket; if (s != null) { s.write(message + '\n'); // 每条消息后加换行 print('📤 Sent: $message'); } } void disconnect() { _subscription?.cancel(); _subscription = null; _socket?.destroy(); _socket = null; print('🔌 Disconnected'); } }
3. 心跳与空闲超时
class HeartbeatManager { final void Function() onSendHeartbeat; final Duration interval; Timer? _timer; HeartbeatManager({required this.onSendHeartbeat, this.interval = const Duration(seconds: 30)}); void start() { _timer ??= Timer.periodic(interval, (_) => onSendHeartbeat()); } void stop() { _timer?.cancel(); _timer = null; } }
4. 自动重连(指数退避 + 抖动)
import 'dart:math'; class ReconnectPolicy { final Duration minBackoff; final Duration maxBackoff; int _attempt = 0; final Random _rnd = Random(); ReconnectPolicy({this.minBackoff = const Duration(seconds: 1), this.maxBackoff = const Duration(seconds: 30)}); Duration nextDelay() { final base = minBackoff.inMilliseconds * pow(2, _attempt).toInt(); final capped = min(base, maxBackoff.inMilliseconds); final jitter = (capped * (0.2 * (_rnd.nextDouble() * 2 - 1))).round(); _attempt = min(_attempt + 1, 10); return Duration(milliseconds: max(0, capped + jitter)); } void reset() => _attempt = 0; }
5. 最佳实践小结
- 行分隔协议:确保发送端每条消息都以
\n
结尾,且消息体不包含换行 - 统一编码:收发都用 UTF‑8
- 心跳保活:15–30 秒 1 次,收不到响应 → 重连
- 自动重连:指数退避 + 抖动
- 超时治理:连接超时、请求超时、空闲超时
- 可观测性:埋点连接时延、失败原因、重连次数、心跳 RTT 等
6. 完整示例代码(可直接运行)
import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'dart:math'; class ReconnectPolicy { final Duration minBackoff; final Duration maxBackoff; int _attempt = 0; final Random _rnd = Random(); ReconnectPolicy({this.minBackoff = const Duration(seconds: 1), this.maxBackoff = const Duration(seconds: 30)}); Duration nextDelay() { final base = minBackoff.inMilliseconds * pow(2, _attempt).toInt(); final capped = min(base, maxBackoff.inMilliseconds); final jitter = (capped * (0.2 * (_rnd.nextDouble() * 2 - 1))).round(); _attempt = min(_attempt + 1, 10); return Duration(milliseconds: max(0, capped + jitter)); } void reset() => _attempt = 0; } class HeartbeatManager { final void Function() onSendHeartbeat; final Duration interval; Timer? _timer; HeartbeatManager({required this.onSendHeartbeat, this.interval = const Duration(seconds: 30)}); void start() { _timer ??= Timer.periodic(interval, (_) => onSendHeartbeat()); } void stop() { _timer?.cancel(); _timer = null; } } class RobustLineClient { final String host; final int port; Socket? _socket; StreamSubscription<String>? _sub; final HeartbeatManager _hb; final ReconnectPolicy _policy = ReconnectPolicy(); Timer? _idleTimer; RobustLineClient({required this.host, required this.port}) : _hb = HeartbeatManager(onSendHeartbeat: () {/* later bound */}, interval: const Duration(seconds: 20)); Future<void> start() async { await _connect(); } Future<void> _connect() async { try { _socket = await Socket.connect(host, port, timeout: const Duration(seconds: 5)); print('✅ connected'); _policy.reset(); _hb.start(); _sub = _socket! .transform(utf8.decoder) .transform(const LineSplitter()) .listen(_onLine, onError: _onError, onDone: _onDone); // 心跳绑定到 send _hb.onSendHeartbeat.call = () => send('ping'); _resetIdleTimeout(); } catch (e) { print('🚫 connect failed: $e'); await _scheduleReconnect(); } } void _onLine(String line) { _resetIdleTimeout(); print('📩 $line'); } void _onError(Object e, [StackTrace? st]) { print('❌ $e'); _teardown(); _scheduleReconnect(); } void _onDone() { print('⚠️ closed by server'); _teardown(); _scheduleReconnect(); } void _teardown() { _idleTimer?.cancel(); _idleTimer = null; _hb.stop(); _sub?.cancel(); _sub = null; _socket?.destroy(); _socket = null; } Future<void> _scheduleReconnect() async { final delay = _policy.nextDelay(); print('⏳ reconnect in ${delay.inMilliseconds} ms'); await Future.delayed(delay); await _connect(); } void _resetIdleTimeout() { _idleTimer?.cancel(); _idleTimer = Timer(const Duration(seconds: 60), () { print('⏰ idle timeout -> reconnect'); _teardown(); _scheduleReconnect(); }); } void send(String message) { _socket?.write(message + '\n'); } }
以上就是Flutter中实现TCP通信的关键步骤与代码示例的详细内容,更多关于Flutter实现TCP通信的资料请关注脚本之家其它相关文章!