基于Vue3+WebSocket实现实时文件传输监控系统
作者:北辰alk
WebSocket是一种在客户端和服务器之间进行双向通信的网络协议,它通过建立持久性的、全双工的连接,允许服务器主动向客户端发送数据,本文小编给大家介绍了基于Vue3+WebSocket实现实时文件传输监控系统,需要的朋友可以参考下
一、需求分析与技术选型
1.1 实时监控系统核心需求
- 实时显示文件传输进度
- 动态更新传输速度/剩余时间
- 多任务并行状态管理
- 异常中断实时告警
- 历史传输记录可视化
1.2 技术架构设计

1.3 技术栈说明
- 前端:Vue3 + Element Plus + ECharts
- 实时通信:WebSocket + Socket.IO
- 后端:Node.js + Express + WebSocket
- 数据存储:MongoDB(历史记录)
- 辅助库:dayjs(时间处理)、lodash(数据处理)
二、环境搭建与配置
2.1 创建Vue3项目
npm create vue@latest # 选择TypeScript、Router、Pinia
2.2 安装核心依赖
npm install socket.io-client @types/socket.io-client echarts element-plus
2.3 WebSocket服务端搭建
// server/ws-server.js
const express = require('express')
const { createServer } = require('http')
const { Server } = require('socket.io')
const app = express()
const httpServer = createServer(app)
const io = new Server(httpServer, {
cors: {
origin: "http://localhost:5173",
methods: ["GET", "POST"]
}
})
// 连接管理
const activeConnections = new Map()
io.on('connection', (socket) => {
console.log(`客户端连接: ${socket.id}`)
// 身份验证
socket.on('auth', (token) => {
const userId = verifyToken(token) // 鉴权逻辑
activeConnections.set(userId, socket)
})
// 断开处理
socket.on('disconnect', () => {
activeConnections.delete(socket.userId)
})
})
httpServer.listen(3001)
三、前端WebSocket集成
3.1 Socket服务封装
// src/utils/socket.ts
import { io, Socket } from 'socket.io-client'
class SocketService {
private static instance: SocketService
private socket: Socket | null = null
constructor() {
this.initSocket()
}
public static getInstance(): SocketService {
if (!SocketService.instance) {
SocketService.instance = new SocketService()
}
return SocketService.instance
}
private initSocket(): void {
this.socket = io('http://localhost:3001', {
transports: ['websocket'],
auth: {
token: localStorage.getItem('token')
}
})
this.socket.on('connect', () => {
console.log('WebSocket连接成功')
})
this.socket.on('exception', (error) => {
console.error('WebSocket错误:', error)
})
}
// 监听传输事件
public onFileProgress(callback: (data: TransferData) => void): void {
this.socket?.on('file-progress', callback)
}
// 发送控制命令
public sendControl(command: ControlCommand): void {
this.socket?.emit('transfer-control', command)
}
}
export const socket = SocketService.getInstance()
四、实时监控界面开发
4.1 传输任务列表组件
<template>
<el-table :data="tasks" style="width: 100%">
<el-table-column prop="fileName" label="文件名" />
<el-table-column label="进度">
<template #default="{ row }">
<el-progress
:percentage="row.progress"
:status="getStatus(row)"
/>
</template>
</el-table-column>
<el-table-column label="速度">
<template #default="{ row }">
{{ formatSpeed(row.speed) }}
</template>
</el-table-column>
<el-table-column label="操作">
<template #default="{ row }">
<el-button @click="handlePause(row.id)">暂停</el-button>
</template>
</el-table-column>
</el-table>
</template>
4.2 实时速度图表(ECharts集成)
<script setup lang="ts">
import * as echarts from 'echarts'
const chartRef = ref<HTMLElement>()
let chart: echarts.ECharts
onMounted(() => {
chart = echarts.init(chartRef.value)
const option = {
xAxis: { type: 'time' },
yAxis: { name: '速度 (MB/s)' },
series: [{
data: [],
type: 'line',
smooth: true
}]
}
chart.setOption(option)
})
// 更新图表数据
const updateChart = (newData: SpeedPoint[]) => {
chart.setOption({
series: [{
data: newData
}]
})
}
</script>
五、文件传输核心逻辑
5.1 增强版文件上传器
class EnhancedFileUploader {
private ws: WebSocket
private file: File
private chunkSize: number
private uploadedBytes = 0
constructor(file: File) {
this.file = file
this.chunkSize = this.calculateChunkSize()
this.ws = new WebSocket('ws://localhost:3001/upload')
}
private calculateChunkSize(): number {
// 根据网络速度动态调整分片大小
const baseSize = 1024 * 1024 // 1MB
return navigator.connection?.downlink
? baseSize * Math.floor(navigator.connection.downlink / 5)
: baseSize
}
async startUpload() {
const reader = new FileReader()
let offset = 0
const readChunk = () => {
const chunk = this.file.slice(offset, offset + this.chunkSize)
reader.readAsArrayBuffer(chunk)
}
reader.onload = (e) => {
if (e.target?.result) {
this.ws.send(e.target.result)
this.uploadedBytes += this.chunkSize
this.reportProgress()
if (offset < this.file.size) {
offset += this.chunkSize
readChunk()
}
}
}
readChunk()
}
private reportProgress() {
const progress = (this.uploadedBytes / this.file.size) * 100
const speed = this.calculateSpeed()
socket.sendControl({
type: 'progress',
data: {
fileId: this.file.name,
progress: Number(progress.toFixed(1)),
speed: speed,
remaining: this.calculateRemainingTime(speed)
}
})
}
}
5.2 速度计算算法
class SpeedCalculator {
private records: Array<{ time: number; bytes: number }> = []
update(bytes: number): void {
this.records.push({
time: Date.now(),
bytes: bytes
})
// 保留最近10秒记录
if (this.records.length > 10) {
this.records.shift()
}
}
get currentSpeed(): number {
if (this.records.length < 2) return 0
const first = this.records[0]
const last = this.records[this.records.length - 1]
const timeDiff = (last.time - first.time) / 1000
const bytesDiff = last.bytes - first.bytes
return bytesDiff / timeDiff // bytes/sec
}
}
六、服务端实时处理
6.1 WebSocket事件处理器
// 文件传输事件处理
io.on('connection', (socket) => {
socket.on('file-upload', async (chunk, ack) => {
try {
const filePath = path.join(uploadDir, fileName)
await fs.promises.appendFile(filePath, Buffer.from(chunk))
ack({
status: 'success',
received: chunk.length
})
// 广播进度更新
io.emit('file-progress', {
fileId: fileName,
progress: calculateProgress(),
speed: currentSpeed
})
} catch (error) {
ack({ status: 'error', message: error.message })
}
})
// 传输控制命令
socket.on('transfer-control', (command) => {
switch (command.type) {
case 'pause':
handlePause(command.fileId)
break
case 'resume':
handleResume(command.fileId)
break
case 'cancel':
handleCancel(command.fileId)
break
}
})
})
6.2 传输状态管理
class TransferManager {
constructor() {
this.transfers = new Map()
}
addTransfer(fileId, fileSize) {
this.transfers.set(fileId, {
startTime: Date.now(),
bytesTransferred: 0,
status: 'uploading'
})
}
updateProgress(fileId, bytes) {
const transfer = this.transfers.get(fileId)
transfer.bytesTransferred += bytes
transfer.lastUpdate = Date.now()
}
getTransferStatus(fileId) {
return this.transfers.get(fileId) || null
}
}
七、高级功能实现
7.1 断线自动重连
// 前端重连逻辑
class ReconnectManager {
private retries = 0
private maxRetries = 5
constructor(private socket: Socket) {
socket.on('disconnect', () => {
this.scheduleReconnect()
})
}
private scheduleReconnect() {
if (this.retries < this.maxRetries) {
setTimeout(() => {
this.socket.connect()
this.retries++
}, Math.min(1000 * 2 ** this.retries, 30000))
}
}
}
7.2 带宽节流控制
class BandwidthThrottle {
private tokens: number
private lastFill: number
constructor(
private capacity: number, // 带宽容量(bytes/sec)
private interval = 1000 // 令牌填充间隔
) {
this.tokens = capacity
this.lastFill = Date.now()
}
async consume(bytes: number): Promise<void> {
const now = Date.now()
const elapsed = now - this.lastFill
if (elapsed > this.interval) {
this.tokens = this.capacity
this.lastFill = now
}
if (this.tokens >= bytes) {
this.tokens -= bytes
return
}
const waitTime = (bytes - this.tokens) / this.capacity * 1000
await new Promise(resolve => setTimeout(resolve, waitTime))
this.tokens = this.capacity - (bytes - this.tokens)
this.lastFill = Date.now()
}
}
7.3 传输质量监控
class NetworkMonitor {
private latencyHistory: number[] = []
private packetLossCount = 0
start() {
setInterval(async () => {
const latency = await this.measureLatency()
this.latencyHistory.push(latency)
if (latency > 1000) {
this.emit('high-latency', latency)
}
}, 5000)
}
private measureLatency(): Promise<number> {
return new Promise((resolve) => {
const start = Date.now()
socket.emit('ping', () => {
resolve(Date.now() - start)
})
})
}
}
八、安全与优化
8.1 安全防护措施
- 传输加密:启用WSS协议
- 请求验证:
// 服务端中间件
const authMiddleware = (socket, next) => {
try {
const token = socket.handshake.auth.token
const decoded = jwt.verify(token, SECRET_KEY)
socket.user = decoded
next()
} catch (error) {
next(new Error('认证失败'))
}
}
- DDOS防护:限制连接频率
const limiter = rateLimit({
windowMs: 60 * 1000, // 1分钟
max: 100 // 最大连接数
})
8.2 性能优化策略
- 二进制传输优化:
// 使用MessagePack替代JSON
socket.emit('binary-data', msgpack.encode(largeData))
- 心跳检测机制:
// 服务端设置
io.set('heartbeat interval', 5000)
io.set('heartbeat timeout', 10000)
- 集群部署:
# 使用Redis适配器 npm install @socket.io/redis-adapter
九、项目部署方案
9.1 生产环境架构

9.2 Nginx配置示例
# WebSocket代理配置
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
listen 443 ssl;
server_name example.com;
location /socket.io/ {
proxy_pass http://ws_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_set_header Host $host;
}
}
十、效果演示与验证
10.1 测试场景设计
- 正常传输测试:上传2GB文件观察实时数据
- 网络中断测试:断开网络后恢复观察续传
- 多并发测试:同时上传10个文件观察性能
- 极限测试:模拟1%丢包率环境
10.2 监控指标验证
| 指标 | 预期结果 |
|---|---|
| 进度更新频率 | ≤500ms |
| 速度计算误差 | ≤5% |
| 断线重连时间 | ≤3s |
| CPU占用率 | ≤30% |
十一、总结与展望
11.1 实现成果
- 完整的实时文件传输监控系统
- 企业级WebSocket应用架构
- 生产环境部署方案
- 全面的异常处理机制
11.2 未来扩展方向
- 集成P2P传输协议
- 添加AI预测传输时间
- 实现跨设备同步传输
- 开发移动端适配版本
