python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > python处理多线程请求接口结果顺序

python处理多线程请求接口结果顺序的4种方案

作者:用户22176592792

除了“无序收集+统一排序”的方案一,处理多线程请求接口结果顺序的核心思路是 “确保结果与请求提交顺序对齐” ,以下是 4 种实用方案,大家可以根据需要进行选择

除了“无序收集+统一排序”的方案一,处理多线程请求接口结果顺序的核心思路是 “确保结果与请求提交顺序对齐” ,以下是 4 种实用方案(含进阶优化和第三方库方案),覆盖不同场景需求,且均保证线程安全和并发效率

一、方案一:固定位置存储(无排序,高效实时)

核心逻辑

提前创建一个与请求总数长度一致的结果列表,每个线程携带唯一的“请求索引”,执行完成后直接将结果写入列表的对应索引位置(如任务 5 的结果写入 ​​results[5]​​)。由于索引与提交顺序一一对应,所有线程完成后,列表自然是有序的。

关键优势

完整代码

import requests
import threading
import time
from typing import List

API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
THREAD_NUM = 10
TOTAL_REQUESTS = 20
TIMEOUT = 5

# 1. 初始化:固定长度的结果列表(与请求顺序对应)+ 互斥锁(保护列表写入)
results: List[tuple] = [None] * TOTAL_REQUESTS  # 初始值为 None,完成后写入结果
lock = threading.Lock()  # 线程安全:避免多线程同时修改同一列表位置

def request_api(index: int):
    """线程执行函数:按索引写入结果到固定位置"""
    url = API_URL.format(index % 10 + 1)
    try:
        response = requests.get(url, timeout=TIMEOUT)
        response.raise_for_status()
        result = (index, True, f"响应:{response.json()['title'][:20]}...")
    except Exception as e:
        result = (index, False, f"失败:{str(e)[:30]}")

    # 2. 加锁写入结果(仅锁定写入操作,不影响请求并发)
    with lock:
        results[index] = result  # 关键:按请求索引写入对应位置

if __name__ == "__main__":
    start_time = time.time()
    threads = []

    # 3. 创建并启动线程(控制线程池大小)
    for i in range(TOTAL_REQUESTS):
        # 限制同时运行的线程数,避免创建过多线程
        if len(threads) >= THREAD_NUM:
            # 等待任意线程完成后再创建新线程
            threading.Thread.join(threading.Thread.wait(threads))
            threads = [t for t in threads if t.is_alive()]
        
        t = threading.Thread(target=request_api, args=(i,), name=f"Thread-{i}")
        t.start()
        threads.append(t)

    # 4. 等待所有线程完成
    for t in threads:
        t.join()

    # 5. 直接按列表顺序输出(已与提交顺序一致)
    print("固定位置存储 - 按请求顺序输出:")
    for idx, is_success, msg in results:
        print(f"任务[{idx}]:{'✅' if is_success else '❌'} {msg}")

    print(f"\n总耗时:{round(time.time() - start_time, 3)}s")

二、方案二:队列(Queue)流式有序处理

核心逻辑

用两个线程安全的队列:

关键优势

完整代码

import requests
import threading
from queue import Queue
import time

API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
THREAD_NUM = 10
TOTAL_REQUESTS = 20
TIMEOUT = 5

def worker(task_queue: Queue, result_queue: Queue):
    """工作线程:从任务队列取任务,执行后存入结果队列"""
    while not task_queue.empty():
        try:
            index = task_queue.get(timeout=1)  # 非阻塞取任务(超时1秒退出)
            url = API_URL.format(index % 10 + 1)
            try:
                response = requests.get(url, timeout=TIMEOUT)
                response.raise_for_status()
                result = (index, True, f"响应:{response.json()['title'][:20]}...")
            except Exception as e:
                result = (index, False, f"失败:{str(e)[:30]}")
            result_queue.put(result)  # 存入结果队列(无序)
        except Exception:
            break

if __name__ == "__main__":
    start_time = time.time()

    # 1. 初始化队列
    task_queue = Queue()  # 按顺序存入请求索引(0~19)
    result_queue = Queue()  # 存储无序的结果

    # 2. 提交任务(按顺序入队)
    for i in range(TOTAL_REQUESTS):
        task_queue.put(i)

    # 3. 启动工作线程
    threads = [threading.Thread(target=worker, args=(task_queue, result_queue)) for _ in range(THREAD_NUM)]
    for t in threads:
        t.start()

    # 4. 主线程按顺序提取结果(流式输出)
    print("队列流式处理 - 按请求顺序实时输出:")
    expected_index = 0  # 期望的下一个任务索引(从0开始)
    completed = 0  # 已完成的任务数

    while completed < TOTAL_REQUESTS:
        if not result_queue.empty():
            index, is_success, msg = result_queue.get()
            # 匹配期望索引则输出,否则放回队列
            if index == expected_index:
                print(f"任务[{index}]:{'✅' if is_success else '❌'} {msg}")
                expected_index += 1
                completed += 1
            else:
                result_queue.put((index, is_success, msg))  # 未匹配则放回
        else:
            time.sleep(0.01)  # 避免空循环占用CPU

    # 5. 等待所有线程结束
    for t in threads:
        t.join()

    print(f"\n总耗时:{round(time.time() - start_time, 3)}s")

三、方案三:使用 ​​concurrent.futures​​ + 有序结果收集

核心逻辑

​ThreadPoolExecutor​​ 提交任务后会返回一个 ​​Future​​ 列表,该列表的顺序与提交顺序一致(即使任务完成顺序无序)。通过遍历 ​​Future​​ 列表(而非 ​​as_completed​​),直接调用 ​​future.result()​​,会按提交顺序阻塞等待每个任务完成,从而自然得到有序结果。

关键优势

完整代码

import requests
from concurrent.futures import ThreadPoolExecutor
import time

API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
THREAD_NUM = 10
TOTAL_REQUESTS = 20
TIMEOUT = 5

def request_api(index: int) -> tuple:
    """单个请求函数:返回(索引,是否成功,结果信息)"""
    url = API_URL.format(index % 10 + 1)
    try:
        response = requests.get(url, timeout=TIMEOUT)
        response.raise_for_status()
        return (index, True, f"响应:{response.json()['title'][:20]}...")
    except Exception as e:
        return (index, False, f"失败:{str(e)[:30]}")

if __name__ == "__main__":
    start_time = time.time()

    # 1. 提交任务并获取 Future 列表(顺序与提交一致)
    with ThreadPoolExecutor(max_workers=THREAD_NUM) as executor:
        future_list = [executor.submit(request_api, i) for i in range(TOTAL_REQUESTS)]

        # 2. 按 Future 列表顺序获取结果(阻塞等待,顺序与提交一致)
        print("ThreadPoolExecutor 有序收集 - 按请求顺序输出:")
        for future in future_list:
            index, is_success, msg = future.result()  # 按提交顺序阻塞等待
            print(f"任务[{index}]:{'✅' if is_success else '❌'} {msg}")

    total_cost = round(time.time() - start_time, 3)
    print(f"\n总耗时:{total_cost}s")

注意

四、方案四:第三方库 ​​aiohttp​​(异步并发+有序结果)

核心逻辑

虽然是“异步”而非“多线程”,但 ​​aiohttp​​ 是 IO 密集型接口请求的更优选择(单线程异步并发,无 GIL 影响,效率更高),且天然支持有序结果——异步任务的提交顺序与结果返回顺序一致。

关键优势

完整代码

import aiohttp
import asyncio
import time

API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
TOTAL_REQUESTS = 20
TIMEOUT = 5

async def request_api(session: aiohttp.ClientSession, index: int) -> tuple:
    """异步请求函数"""
    url = API_URL.format(index % 10 + 1)
    try:
        async with session.get(url, timeout=TIMEOUT) as response:
            response.raise_for_status()
            data = await response.json()
            return (index, True, f"响应:{data['title'][:20]}...")
    except Exception as e:
        return (index, False, f"失败:{str(e)[:30]}")

async def main():
    start_time = time.time()

    # 1. 创建异步会话(复用连接,提升效率)
    async with aiohttp.ClientSession() as session:
        # 2. 创建所有异步任务(顺序与提交一致)
        tasks = [request_api(session, i) for i in range(TOTAL_REQUESTS)]
        # 3. 并发执行任务,按提交顺序获取结果
        results = await asyncio.gather(*tasks)  # gather 保证结果顺序与任务顺序一致

    # 4. 输出结果(已有序)
    print("aiohttp 异步并发 - 按请求顺序输出:")
    for idx, is_success, msg in results:
        print(f"任务[{idx}]:{'✅' if is_success else '❌'} {msg}")

    total_cost = round(time.time() - start_time, 3)
    print(f"\n总耗时:{total_cost}s")

if __name__ == "__main__":
    # 兼容 Windows 系统
    if __name__ == "__main__":
        if sys.platform == "win32":
            asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
        asyncio.run(main())

依赖安装

pip install aiohttp

各方案对比&选型建议

方案技术依赖核心优势适用场景
固定位置存储threading + lock效率最高、可实时查进度高并发、需跟踪任务状态
队列流式处理threading + Queue流式输出、无需等待所有任务实时展示进度、边请求边处理
ThreadPoolExecutor 有序收集concurrent.futures代码最简单、标准库支持无需实时输出、按顺序处理结果
aiohttp 异步并发aiohttp + asyncio并发效率最高、无线程安全问题高并发接口请求、爬取、压测

面试&实战关键要点

线程安全是前提:修改共享数据(如列表)必须加锁,或使用线程安全的数据结构(如 ​​queue.Queue​​);

有序的核心是“索引绑定” :无论哪种方案,都需要通过“请求索引”关联任务和结果,确保顺序对齐;

IO 密集型优先选异步:​​aiohttp​​ 异步并发效率高于多线程,且天然有序,是接口请求的最优解;

避免过度设计:简单场景用 ​​ThreadPoolExecutor​​ 有序收集(方案四),复杂场景用队列或固定位置存储,无需追求复杂逻辑。

到此这篇关于python处理多线程请求接口结果顺序的4种方案的文章就介绍到这了,更多相关python处理多线程请求接口结果顺序内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文