python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python多线程下载

基于Python实现多线程下载并显示统计进度

作者:宏权实验室

这篇文章主要为大家详细介绍了如何基于Python编写一个多线程下载监控脚本,支持多线程下载APK文件并提供实时监控界面,有需要的小伙伴可以了解下

简介:

这是一个Python多线程下载监控脚本,主要功能包括: 实现多线程下载APK文件,支持50个下载任务和8个并发线程 提供实时监控界面,显示: 总体进度条和完成百分比 成功/失败任务统计 每个线程的状态、当前任务、进度、下载速度和下载量 采用线程安全的数据结构管理下载状态 支持流式下载和进度更新(每0.1秒更新一次) 包含下载统计功能:总用时、平均速度、吞吐量等 脚本使用requests库进行下载,threading实现多线程,Queue管理任务队列,并通过dataclass存储线程状态。监控界面采用终端表格

效果:

实现脚本:

downapk_thread_100.py

import requests
import threading
import time
import sys
from queue import Queue
from dataclasses import dataclass
from typing import Dict, List
import json


@dataclass
class ThreadStatus:
    thread_id: int
    current_task: int = 0
    status: str = "空闲"
    progress: float = 0
    speed: float = 0
    downloaded: int = 0
    start_time: float = 0


class SimpleMultiDownloader:
    def __init__(self, url, total_tasks=100, max_threads=10):
        self.url = url
        self.total_tasks = total_tasks
        self.max_threads = max_threads
        self.task_queue = Queue()

        # 初始化线程状态
        self.threads_status: Dict[int, ThreadStatus] = {}
        for i in range(1, max_threads + 1):
            self.threads_status[i] = ThreadStatus(thread_id=i)

        # 全局统计
        self.global_stats = {
            'completed': 0,
            'failed': 0,
            'total_size': 0,
            'start_time': time.time(),
            'speeds': []
        }
        self.lock = threading.Lock()
        self.display_active = True

    def clear_screen(self):
        """清屏"""
        os.system('cls' if os.name == 'nt' else 'clear')

    def display_status(self):
        """显示状态表格"""
        while self.display_active:
            self.clear_screen()

            print("┌" + "─" * 78 + "┐")
            print("│" + "多线程下载监控".center(78) + "│")
            print("├" + "─" * 78 + "┤")

            # 全局统计
            with self.lock:
                completed = self.global_stats['completed']
                failed = self.global_stats['failed']
                total_done = completed + failed
                elapsed = time.time() - self.global_stats['start_time']

                if completed > 0:
                    avg_speed = sum(self.global_stats['speeds']) / len(self.global_stats['speeds'])
                else:
                    avg_speed = 0

            progress = total_done / self.total_tasks * 100
            bar_length = 40
            filled = int(bar_length * progress / 100)
            progress_bar = "█" * filled + "░" * (bar_length - filled)

            print(f"│ 总进度: [{progress_bar}] {progress:6.1f}%")
            print(f"│ 成功: {completed:4d}  失败: {failed:4d}  总计: {total_done:4d}/{self.total_tasks:4d}")
            print(f"│ 用时: {elapsed:7.1f}s   平均速度: {avg_speed:7.1f} KB/s")
            print("├" + "─" * 78 + "┤")
            print("│ 线程ID │   任务   │       状态       │   进度   │   速度   │   下载量   │")
            print("├" + "─" * 78 + "┤")

            # 显示每个线程状态
            with self.lock:
                for thread_id in sorted(self.threads_status.keys()):
                    status = self.threads_status[thread_id]

                    # 状态颜色/图标
                    if status.status == "完成":
                        status_icon = "✅"
                    elif "失败" in status.status:
                        status_icon = "❌"
                    elif status.status == "下载中":
                        status_icon = "⏬"
                    else:
                        status_icon = "⏳"

                    # 格式化显示
                    task_str = f"{status.current_task:4d}" if status.current_task > 0 else "等待"
                    status_str = f"{status_icon} {status.status[:12]:12s}"
                    progress_str = f"{status.progress:6.1f}%" if status.progress > 0 else "等待"
                    speed_str = f"{status.speed:7.1f}" if status.speed > 0 else "等待"
                    size_str = f"{status.downloaded / 1024:7.1f}KB" if status.downloaded > 0 else "等待"

                    print(
                        f"│ {thread_id:6d} │ {task_str:8s} │ {status_str:16s} │ {progress_str:8s} │ {speed_str:8s} │ {size_str:10s} │")

            print("└" + "─" * 78 + "┘")
            print("按 Ctrl+C 退出监控 (下载继续)")

            # 检查是否完成
            if total_done >= self.total_tasks:
                print("\n✅ 所有任务完成!")
                self.display_active = False
                break

            time.sleep(0.3)

    def download_worker(self, thread_id):
        """工作线程"""
        while True:
            try:
                task_id = self.task_queue.get(timeout=1)

                # 更新线程状态
                with self.lock:
                    self.threads_status[thread_id].current_task = task_id
                    self.threads_status[thread_id].status = "准备中"
                    self.threads_status[thread_id].start_time = time.time()

                # 执行下载
                success = self.perform_download(thread_id, task_id)

                # 更新全局统计
                with self.lock:
                    if success:
                        self.global_stats['completed'] += 1
                    else:
                        self.global_stats['failed'] += 1

                self.task_queue.task_done()

            except:
                break

    def perform_download(self, thread_id, task_id):
        """执行下载任务"""
        try:
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
                'Accept': '*/*',
                'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
                'Referer': 'https://fly.walkera.cn/'
            }

            with self.lock:
                self.threads_status[thread_id].status = "连接中"

            # 获取文件大小
            response = requests.get(self.url, headers=headers, stream=True, timeout=30)
            response.raise_for_status()

            # 获取文件大小
            file_size = int(response.headers.get('content-length', 0))

            downloaded = 0
            start_time = time.time()
            last_update = start_time

            with self.lock:
                self.threads_status[thread_id].status = "下载中"

            # 流式下载
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    downloaded += len(chunk)

                    # 更新显示(限制更新频率)
                    current_time = time.time()
                    if current_time - last_update > 0.1:  # 每0.1秒更新一次
                        elapsed = current_time - start_time
                        speed = downloaded / elapsed / 1024 if elapsed > 0 else 0
                        progress = (downloaded / file_size * 100) if file_size > 0 else 0

                        with self.lock:
                            self.threads_status[thread_id].progress = progress
                            self.threads_status[thread_id].speed = speed
                            self.threads_status[thread_id].downloaded = downloaded

                        last_update = current_time

            # 下载完成
            end_time = time.time()
            download_time = end_time - start_time
            speed = downloaded / download_time / 1024 if download_time > 0 else 0

            with self.lock:
                self.threads_status[thread_id].status = "完成"
                self.threads_status[thread_id].progress = 100
                self.threads_status[thread_id].speed = speed
                self.global_stats['total_size'] += downloaded
                self.global_stats['speeds'].append(speed)

            return True

        except Exception as e:
            with self.lock:
                self.threads_status[thread_id].status = f"失败: {str(e)[:15]}"
                self.threads_status[thread_id].progress = -1

            return False

    def run(self):
        """运行下载器"""
        print("开始多线程下载...")
        print(f"URL: {self.url}")
        print(f"任务数: {self.total_tasks}")
        print(f"线程数: {self.max_threads}")

        # 添加任务到队列
        for i in range(self.total_tasks):
            self.task_queue.put(i + 1)

        # 启动显示线程
        display_thread = threading.Thread(target=self.display_status, daemon=True)
        display_thread.start()

        # 启动工作线程
        threads = []
        for i in range(self.max_threads):
            thread = threading.Thread(target=self.download_worker, args=(i + 1,))
            thread.daemon = True
            thread.start()
            threads.append(thread)

        try:
            # 等待所有任务完成
            self.task_queue.join()

            # 等待显示线程结束
            display_thread.join(timeout=2)

        except KeyboardInterrupt:
            print("\n用户中断")
        finally:
            self.display_active = False

        # 显示最终统计
        self.show_summary()

    def show_summary(self):
        """显示最终统计"""
        total_time = time.time() - self.global_stats['start_time']

        print("\n" + "=" * 60)
        print("下载完成! 最终统计:")
        print("=" * 60)

        with self.lock:
            completed = self.global_stats['completed']
            failed = self.global_stats['failed']

            print(f"总任务数: {self.total_tasks}")
            print(f"成功: {completed} ({completed / self.total_tasks * 100:.1f}%)")
            print(f"失败: {failed} ({failed / self.total_tasks * 100:.1f}%)")
            print(f"总用时: {total_time:.2f} 秒")
            print(f"总下载量: {self.global_stats['total_size'] / 1024 / 1024:.2f} MB")

            if completed > 0:
                avg_speed = sum(self.global_stats['speeds']) / len(self.global_stats['speeds'])
                print(f"平均速度: {avg_speed:.1f} KB/s")
                print(f"吞吐量: {self.global_stats['total_size'] / total_time / 1024:.1f} KB/s")


# 使用示例
if __name__ == "__main__":
    import os

    url = "https://fly.walkera.cn/fd/download?name=WKFLY-1.3.72-1018-release.apk"

    downloader = SimpleMultiDownloader(
        url=url,
        total_tasks=50,  # 减少任务数便于观察
        max_threads=8  # 线程数
    )

    downloader.run()

到此这篇关于基于Python实现多线程下载并显示统计进度的文章就介绍到这了,更多相关Python多线程下载内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文