vue.js

关注公众号 jb51net

关闭
首页 > 网络编程 > JavaScript > javascript类库 > vue.js > Vue3 WebSocket文件传输监控

基于Vue3+WebSocket实现实时文件传输监控系统

作者:北辰alk

WebSocket是一种在客户端和服务器之间进行双向通信的网络协议,它通过建立持久性的、全双工的连接,允许服务器主动向客户端发送数据,本文小编给大家介绍了基于Vue3+WebSocket实现实时文件传输监控系统,需要的朋友可以参考下

一、需求分析与技术选型

1.1 实时监控系统核心需求

1.2 技术架构设计

1.3 技术栈说明

二、环境搭建与配置

2.1 创建Vue3项目

npm create vue@latest
# 选择TypeScript、Router、Pinia

2.2 安装核心依赖

npm install socket.io-client @types/socket.io-client echarts element-plus

2.3 WebSocket服务端搭建

// server/ws-server.js
const express = require('express')
const { createServer } = require('http')
const { Server } = require('socket.io')

const app = express()
const httpServer = createServer(app)
const io = new Server(httpServer, {
  cors: {
    origin: "http://localhost:5173",
    methods: ["GET", "POST"]
  }
})

// 连接管理
const activeConnections = new Map()

io.on('connection', (socket) => {
  console.log(`客户端连接: ${socket.id}`)
  
  // 身份验证
  socket.on('auth', (token) => {
    const userId = verifyToken(token) // 鉴权逻辑
    activeConnections.set(userId, socket)
  })

  // 断开处理
  socket.on('disconnect', () => {
    activeConnections.delete(socket.userId)
  })
})

httpServer.listen(3001)

三、前端WebSocket集成

3.1 Socket服务封装

// src/utils/socket.ts
import { io, Socket } from 'socket.io-client'

class SocketService {
  private static instance: SocketService
  private socket: Socket | null = null
  
  constructor() {
    this.initSocket()
  }

  public static getInstance(): SocketService {
    if (!SocketService.instance) {
      SocketService.instance = new SocketService()
    }
    return SocketService.instance
  }

  private initSocket(): void {
    this.socket = io('http://localhost:3001', {
      transports: ['websocket'],
      auth: {
        token: localStorage.getItem('token')
      }
    })

    this.socket.on('connect', () => {
      console.log('WebSocket连接成功')
    })

    this.socket.on('exception', (error) => {
      console.error('WebSocket错误:', error)
    })
  }

  // 监听传输事件
  public onFileProgress(callback: (data: TransferData) => void): void {
    this.socket?.on('file-progress', callback)
  }

  // 发送控制命令
  public sendControl(command: ControlCommand): void {
    this.socket?.emit('transfer-control', command)
  }
}

export const socket = SocketService.getInstance()

四、实时监控界面开发

4.1 传输任务列表组件

<template>
  <el-table :data="tasks" style="width: 100%">
    <el-table-column prop="fileName" label="文件名" />
    <el-table-column label="进度">
      <template #default="{ row }">
        <el-progress 
          :percentage="row.progress" 
          :status="getStatus(row)"
        />
      </template>
    </el-table-column>
    <el-table-column label="速度">
      <template #default="{ row }">
        {{ formatSpeed(row.speed) }}
      </template>
    </el-table-column>
    <el-table-column label="操作">
      <template #default="{ row }">
        <el-button @click="handlePause(row.id)">暂停</el-button>
      </template>
    </el-table-column>
  </el-table>
</template>

4.2 实时速度图表(ECharts集成)

<script setup lang="ts">
import * as echarts from 'echarts'

const chartRef = ref<HTMLElement>()
let chart: echarts.ECharts

onMounted(() => {
  chart = echarts.init(chartRef.value)
  
  const option = {
    xAxis: { type: 'time' },
    yAxis: { name: '速度 (MB/s)' },
    series: [{
      data: [],
      type: 'line',
      smooth: true
    }]
  }
  
  chart.setOption(option)
})

// 更新图表数据
const updateChart = (newData: SpeedPoint[]) => {
  chart.setOption({
    series: [{
      data: newData
    }]
  })
}
</script>

五、文件传输核心逻辑

5.1 增强版文件上传器

class EnhancedFileUploader {
  private ws: WebSocket
  private file: File
  private chunkSize: number
  private uploadedBytes = 0
  
  constructor(file: File) {
    this.file = file
    this.chunkSize = this.calculateChunkSize()
    this.ws = new WebSocket('ws://localhost:3001/upload')
  }

  private calculateChunkSize(): number {
    // 根据网络速度动态调整分片大小
    const baseSize = 1024 * 1024 // 1MB
    return navigator.connection?.downlink 
      ? baseSize * Math.floor(navigator.connection.downlink / 5)
      : baseSize
  }

  async startUpload() {
    const reader = new FileReader()
    let offset = 0
    
    const readChunk = () => {
      const chunk = this.file.slice(offset, offset + this.chunkSize)
      reader.readAsArrayBuffer(chunk)
    }

    reader.onload = (e) => {
      if (e.target?.result) {
        this.ws.send(e.target.result)
        this.uploadedBytes += this.chunkSize
        this.reportProgress()
        
        if (offset < this.file.size) {
          offset += this.chunkSize
          readChunk()
        }
      }
    }

    readChunk()
  }

  private reportProgress() {
    const progress = (this.uploadedBytes / this.file.size) * 100
    const speed = this.calculateSpeed()
    
    socket.sendControl({
      type: 'progress',
      data: {
        fileId: this.file.name,
        progress: Number(progress.toFixed(1)),
        speed: speed,
        remaining: this.calculateRemainingTime(speed)
      }
    })
  }
}

5.2 速度计算算法

class SpeedCalculator {
  private records: Array<{ time: number; bytes: number }> = []
  
  update(bytes: number): void {
    this.records.push({
      time: Date.now(),
      bytes: bytes
    })
    // 保留最近10秒记录
    if (this.records.length > 10) {
      this.records.shift()
    }
  }

  get currentSpeed(): number {
    if (this.records.length < 2) return 0
    
    const first = this.records[0]
    const last = this.records[this.records.length - 1]
    const timeDiff = (last.time - first.time) / 1000
    const bytesDiff = last.bytes - first.bytes
    
    return bytesDiff / timeDiff // bytes/sec
  }
}

六、服务端实时处理

6.1 WebSocket事件处理器

// 文件传输事件处理
io.on('connection', (socket) => {
  socket.on('file-upload', async (chunk, ack) => {
    try {
      const filePath = path.join(uploadDir, fileName)
      await fs.promises.appendFile(filePath, Buffer.from(chunk))
      
      ack({
        status: 'success',
        received: chunk.length
      })
      
      // 广播进度更新
      io.emit('file-progress', {
        fileId: fileName,
        progress: calculateProgress(),
        speed: currentSpeed
      })
    } catch (error) {
      ack({ status: 'error', message: error.message })
    }
  })

  // 传输控制命令
  socket.on('transfer-control', (command) => {
    switch (command.type) {
      case 'pause':
        handlePause(command.fileId)
        break
      case 'resume':
        handleResume(command.fileId)
        break
      case 'cancel':
        handleCancel(command.fileId)
        break
    }
  })
})

6.2 传输状态管理

class TransferManager {
  constructor() {
    this.transfers = new Map()
  }

  addTransfer(fileId, fileSize) {
    this.transfers.set(fileId, {
      startTime: Date.now(),
      bytesTransferred: 0,
      status: 'uploading'
    })
  }

  updateProgress(fileId, bytes) {
    const transfer = this.transfers.get(fileId)
    transfer.bytesTransferred += bytes
    transfer.lastUpdate = Date.now()
  }

  getTransferStatus(fileId) {
    return this.transfers.get(fileId) || null
  }
}

七、高级功能实现

7.1 断线自动重连

// 前端重连逻辑
class ReconnectManager {
  private retries = 0
  private maxRetries = 5
  
  constructor(private socket: Socket) {
    socket.on('disconnect', () => {
      this.scheduleReconnect()
    })
  }

  private scheduleReconnect() {
    if (this.retries < this.maxRetries) {
      setTimeout(() => {
        this.socket.connect()
        this.retries++
      }, Math.min(1000 * 2 ** this.retries, 30000))
    }
  }
}

7.2 带宽节流控制

class BandwidthThrottle {
  private tokens: number
  private lastFill: number
  
  constructor(
    private capacity: number, // 带宽容量(bytes/sec)
    private interval = 1000 // 令牌填充间隔
  ) {
    this.tokens = capacity
    this.lastFill = Date.now()
  }

  async consume(bytes: number): Promise<void> {
    const now = Date.now()
    const elapsed = now - this.lastFill
    
    if (elapsed > this.interval) {
      this.tokens = this.capacity
      this.lastFill = now
    }

    if (this.tokens >= bytes) {
      this.tokens -= bytes
      return
    }
    
    const waitTime = (bytes - this.tokens) / this.capacity * 1000
    await new Promise(resolve => setTimeout(resolve, waitTime))
    this.tokens = this.capacity - (bytes - this.tokens)
    this.lastFill = Date.now()
  }
}

7.3 传输质量监控

class NetworkMonitor {
  private latencyHistory: number[] = []
  private packetLossCount = 0
  
  start() {
    setInterval(async () => {
      const latency = await this.measureLatency()
      this.latencyHistory.push(latency)
      
      if (latency > 1000) {
        this.emit('high-latency', latency)
      }
    }, 5000)
  }

  private measureLatency(): Promise<number> {
    return new Promise((resolve) => {
      const start = Date.now()
      socket.emit('ping', () => {
        resolve(Date.now() - start)
      })
    })
  }
}

八、安全与优化

8.1 安全防护措施

// 服务端中间件
const authMiddleware = (socket, next) => {
  try {
    const token = socket.handshake.auth.token
    const decoded = jwt.verify(token, SECRET_KEY)
    socket.user = decoded
    next()
  } catch (error) {
    next(new Error('认证失败'))
  }
}
const limiter = rateLimit({
  windowMs: 60 * 1000, // 1分钟
  max: 100 // 最大连接数
})

8.2 性能优化策略

// 使用MessagePack替代JSON
socket.emit('binary-data', msgpack.encode(largeData))
// 服务端设置
io.set('heartbeat interval', 5000)
io.set('heartbeat timeout', 10000)
# 使用Redis适配器
npm install @socket.io/redis-adapter

九、项目部署方案

9.1 生产环境架构

9.2 Nginx配置示例

# WebSocket代理配置
map $http_upgrade $connection_upgrade {
    default upgrade;
    '' close;
}

server {
    listen 443 ssl;
    server_name example.com;

    location /socket.io/ {
        proxy_pass http://ws_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;
        proxy_set_header Host $host;
    }
}

十、效果演示与验证

10.1 测试场景设计

  1. 正常传输测试:上传2GB文件观察实时数据
  2. 网络中断测试:断开网络后恢复观察续传
  3. 多并发测试:同时上传10个文件观察性能
  4. 极限测试:模拟1%丢包率环境

10.2 监控指标验证

指标预期结果
进度更新频率≤500ms
速度计算误差≤5%
断线重连时间≤3s
CPU占用率≤30%

十一、总结与展望

11.1 实现成果

11.2 未来扩展方向

您可能感兴趣的文章:
阅读全文