基于Vue3+WebSocket实现实时文件传输监控系统
作者:北辰alk
WebSocket是一种在客户端和服务器之间进行双向通信的网络协议,它通过建立持久性的、全双工的连接,允许服务器主动向客户端发送数据,本文小编给大家介绍了基于Vue3+WebSocket实现实时文件传输监控系统,需要的朋友可以参考下
一、需求分析与技术选型
1.1 实时监控系统核心需求
- 实时显示文件传输进度
- 动态更新传输速度/剩余时间
- 多任务并行状态管理
- 异常中断实时告警
- 历史传输记录可视化
1.2 技术架构设计
1.3 技术栈说明
- 前端:Vue3 + Element Plus + ECharts
- 实时通信:WebSocket + Socket.IO
- 后端:Node.js + Express + WebSocket
- 数据存储:MongoDB(历史记录)
- 辅助库:dayjs(时间处理)、lodash(数据处理)
二、环境搭建与配置
2.1 创建Vue3项目
npm create vue@latest # 选择TypeScript、Router、Pinia
2.2 安装核心依赖
npm install socket.io-client @types/socket.io-client echarts element-plus
2.3 WebSocket服务端搭建
// server/ws-server.js const express = require('express') const { createServer } = require('http') const { Server } = require('socket.io') const app = express() const httpServer = createServer(app) const io = new Server(httpServer, { cors: { origin: "http://localhost:5173", methods: ["GET", "POST"] } }) // 连接管理 const activeConnections = new Map() io.on('connection', (socket) => { console.log(`客户端连接: ${socket.id}`) // 身份验证 socket.on('auth', (token) => { const userId = verifyToken(token) // 鉴权逻辑 activeConnections.set(userId, socket) }) // 断开处理 socket.on('disconnect', () => { activeConnections.delete(socket.userId) }) }) httpServer.listen(3001)
三、前端WebSocket集成
3.1 Socket服务封装
// src/utils/socket.ts import { io, Socket } from 'socket.io-client' class SocketService { private static instance: SocketService private socket: Socket | null = null constructor() { this.initSocket() } public static getInstance(): SocketService { if (!SocketService.instance) { SocketService.instance = new SocketService() } return SocketService.instance } private initSocket(): void { this.socket = io('http://localhost:3001', { transports: ['websocket'], auth: { token: localStorage.getItem('token') } }) this.socket.on('connect', () => { console.log('WebSocket连接成功') }) this.socket.on('exception', (error) => { console.error('WebSocket错误:', error) }) } // 监听传输事件 public onFileProgress(callback: (data: TransferData) => void): void { this.socket?.on('file-progress', callback) } // 发送控制命令 public sendControl(command: ControlCommand): void { this.socket?.emit('transfer-control', command) } } export const socket = SocketService.getInstance()
四、实时监控界面开发
4.1 传输任务列表组件
<template> <el-table :data="tasks" style="width: 100%"> <el-table-column prop="fileName" label="文件名" /> <el-table-column label="进度"> <template #default="{ row }"> <el-progress :percentage="row.progress" :status="getStatus(row)" /> </template> </el-table-column> <el-table-column label="速度"> <template #default="{ row }"> {{ formatSpeed(row.speed) }} </template> </el-table-column> <el-table-column label="操作"> <template #default="{ row }"> <el-button @click="handlePause(row.id)">暂停</el-button> </template> </el-table-column> </el-table> </template>
4.2 实时速度图表(ECharts集成)
<script setup lang="ts"> import * as echarts from 'echarts' const chartRef = ref<HTMLElement>() let chart: echarts.ECharts onMounted(() => { chart = echarts.init(chartRef.value) const option = { xAxis: { type: 'time' }, yAxis: { name: '速度 (MB/s)' }, series: [{ data: [], type: 'line', smooth: true }] } chart.setOption(option) }) // 更新图表数据 const updateChart = (newData: SpeedPoint[]) => { chart.setOption({ series: [{ data: newData }] }) } </script>
五、文件传输核心逻辑
5.1 增强版文件上传器
class EnhancedFileUploader { private ws: WebSocket private file: File private chunkSize: number private uploadedBytes = 0 constructor(file: File) { this.file = file this.chunkSize = this.calculateChunkSize() this.ws = new WebSocket('ws://localhost:3001/upload') } private calculateChunkSize(): number { // 根据网络速度动态调整分片大小 const baseSize = 1024 * 1024 // 1MB return navigator.connection?.downlink ? baseSize * Math.floor(navigator.connection.downlink / 5) : baseSize } async startUpload() { const reader = new FileReader() let offset = 0 const readChunk = () => { const chunk = this.file.slice(offset, offset + this.chunkSize) reader.readAsArrayBuffer(chunk) } reader.onload = (e) => { if (e.target?.result) { this.ws.send(e.target.result) this.uploadedBytes += this.chunkSize this.reportProgress() if (offset < this.file.size) { offset += this.chunkSize readChunk() } } } readChunk() } private reportProgress() { const progress = (this.uploadedBytes / this.file.size) * 100 const speed = this.calculateSpeed() socket.sendControl({ type: 'progress', data: { fileId: this.file.name, progress: Number(progress.toFixed(1)), speed: speed, remaining: this.calculateRemainingTime(speed) } }) } }
5.2 速度计算算法
class SpeedCalculator { private records: Array<{ time: number; bytes: number }> = [] update(bytes: number): void { this.records.push({ time: Date.now(), bytes: bytes }) // 保留最近10秒记录 if (this.records.length > 10) { this.records.shift() } } get currentSpeed(): number { if (this.records.length < 2) return 0 const first = this.records[0] const last = this.records[this.records.length - 1] const timeDiff = (last.time - first.time) / 1000 const bytesDiff = last.bytes - first.bytes return bytesDiff / timeDiff // bytes/sec } }
六、服务端实时处理
6.1 WebSocket事件处理器
// 文件传输事件处理 io.on('connection', (socket) => { socket.on('file-upload', async (chunk, ack) => { try { const filePath = path.join(uploadDir, fileName) await fs.promises.appendFile(filePath, Buffer.from(chunk)) ack({ status: 'success', received: chunk.length }) // 广播进度更新 io.emit('file-progress', { fileId: fileName, progress: calculateProgress(), speed: currentSpeed }) } catch (error) { ack({ status: 'error', message: error.message }) } }) // 传输控制命令 socket.on('transfer-control', (command) => { switch (command.type) { case 'pause': handlePause(command.fileId) break case 'resume': handleResume(command.fileId) break case 'cancel': handleCancel(command.fileId) break } }) })
6.2 传输状态管理
class TransferManager { constructor() { this.transfers = new Map() } addTransfer(fileId, fileSize) { this.transfers.set(fileId, { startTime: Date.now(), bytesTransferred: 0, status: 'uploading' }) } updateProgress(fileId, bytes) { const transfer = this.transfers.get(fileId) transfer.bytesTransferred += bytes transfer.lastUpdate = Date.now() } getTransferStatus(fileId) { return this.transfers.get(fileId) || null } }
七、高级功能实现
7.1 断线自动重连
// 前端重连逻辑 class ReconnectManager { private retries = 0 private maxRetries = 5 constructor(private socket: Socket) { socket.on('disconnect', () => { this.scheduleReconnect() }) } private scheduleReconnect() { if (this.retries < this.maxRetries) { setTimeout(() => { this.socket.connect() this.retries++ }, Math.min(1000 * 2 ** this.retries, 30000)) } } }
7.2 带宽节流控制
class BandwidthThrottle { private tokens: number private lastFill: number constructor( private capacity: number, // 带宽容量(bytes/sec) private interval = 1000 // 令牌填充间隔 ) { this.tokens = capacity this.lastFill = Date.now() } async consume(bytes: number): Promise<void> { const now = Date.now() const elapsed = now - this.lastFill if (elapsed > this.interval) { this.tokens = this.capacity this.lastFill = now } if (this.tokens >= bytes) { this.tokens -= bytes return } const waitTime = (bytes - this.tokens) / this.capacity * 1000 await new Promise(resolve => setTimeout(resolve, waitTime)) this.tokens = this.capacity - (bytes - this.tokens) this.lastFill = Date.now() } }
7.3 传输质量监控
class NetworkMonitor { private latencyHistory: number[] = [] private packetLossCount = 0 start() { setInterval(async () => { const latency = await this.measureLatency() this.latencyHistory.push(latency) if (latency > 1000) { this.emit('high-latency', latency) } }, 5000) } private measureLatency(): Promise<number> { return new Promise((resolve) => { const start = Date.now() socket.emit('ping', () => { resolve(Date.now() - start) }) }) } }
八、安全与优化
8.1 安全防护措施
- 传输加密:启用WSS协议
- 请求验证:
// 服务端中间件 const authMiddleware = (socket, next) => { try { const token = socket.handshake.auth.token const decoded = jwt.verify(token, SECRET_KEY) socket.user = decoded next() } catch (error) { next(new Error('认证失败')) } }
- DDOS防护:限制连接频率
const limiter = rateLimit({ windowMs: 60 * 1000, // 1分钟 max: 100 // 最大连接数 })
8.2 性能优化策略
- 二进制传输优化:
// 使用MessagePack替代JSON socket.emit('binary-data', msgpack.encode(largeData))
- 心跳检测机制:
// 服务端设置 io.set('heartbeat interval', 5000) io.set('heartbeat timeout', 10000)
- 集群部署:
# 使用Redis适配器 npm install @socket.io/redis-adapter
九、项目部署方案
9.1 生产环境架构
9.2 Nginx配置示例
# WebSocket代理配置 map $http_upgrade $connection_upgrade { default upgrade; '' close; } server { listen 443 ssl; server_name example.com; location /socket.io/ { proxy_pass http://ws_backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; proxy_set_header Host $host; } }
十、效果演示与验证
10.1 测试场景设计
- 正常传输测试:上传2GB文件观察实时数据
- 网络中断测试:断开网络后恢复观察续传
- 多并发测试:同时上传10个文件观察性能
- 极限测试:模拟1%丢包率环境
10.2 监控指标验证
指标 | 预期结果 |
---|---|
进度更新频率 | ≤500ms |
速度计算误差 | ≤5% |
断线重连时间 | ≤3s |
CPU占用率 | ≤30% |
十一、总结与展望
11.1 实现成果
- 完整的实时文件传输监控系统
- 企业级WebSocket应用架构
- 生产环境部署方案
- 全面的异常处理机制
11.2 未来扩展方向
- 集成P2P传输协议
- 添加AI预测传输时间
- 实现跨设备同步传输
- 开发移动端适配版本