Python异步编程入门之实现文件批处理的并发处理方式
作者:engchina
本文以Python初级程序员为对象,介绍了如何使用asyncio和logging模块实现一个异步批处理文件的并发处理系统,以提高处理大量文件或数据时的效率,其中,通过配置日志系统记录处理文件的日志信息,定义AsyncBatchProcessor类控制并发任务的数量
引言
在现代软件开发中,处理大量文件或数据时,提高处理效率和并发性是非常重要的。
Python 的 asyncio
库提供了一种强大的方式来实现异步编程,从而提高程序的并发处理能力。
本文将面向 Python 初级程序员,介绍如何使用 asyncio
和 logging
模块来实现一个异步批处理文件的并发处理系统。
代码实现
1. 日志配置
首先,我们需要配置日志系统,以便在处理文件时记录日志信息。
日志配置包括设置日志格式和输出位置。
import logging import os # 获取当前文件的绝对路径 current_file = os.path.abspath(__file__) # 配置日志格式 log_format = '%(asctime)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s' logging.basicConfig(format=log_format, level=logging.INFO) # 创建一个文件处理器,并将日志输出到文件 file_handler = logging.FileHandler('app.log') file_handler.setFormatter(logging.Formatter(log_format)) logging.getLogger().addHandler(file_handler)
2. 异步批处理类
接下来,我们定义一个 AsyncBatchProcessor
类,用于处理批量文件。
该类使用 asyncio.Semaphore
来控制并发任务的数量。
import asyncio import random DEFAULT_MAX_CONCURRENT_TASKS = 2 # 最大并发任务数 MAX_RETRIES = 3 # 最大重试次数 class AsyncBatchProcessor: def __init__(self, max_concurrent: int = DEFAULT_MAX_CONCURRENT_TASKS): self.max_concurrent = max_concurrent self.semaphore = asyncio.Semaphore(max_concurrent) async def process_single_file( self, input_file: str, retry_count: int = 0 ) -> None: """处理单个文件的异步方法""" async with self.semaphore: # 使用信号量控制并发 try: logging.info(f"Processing file: {input_file}") # 模拟文件处理过程 await asyncio.sleep(random.uniform(0.5, 2.0)) logging.info(f"Successfully processed {input_file}") except Exception as e: logging.error(f"Error processing {input_file} of Attempt {retry_count}: {str(e)}") if retry_count < MAX_RETRIES: logging.info(f"Retrying {input_file} (Attempt {retry_count + 1})") await asyncio.sleep(1) await self.process_single_file(input_file, retry_count + 1) else: logging.error(f"Failed to process {input_file} after {MAX_RETRIES} attempts") async def process_batch( self, file_list: list ) -> None: total_files = len(file_list) logging.info(f"Found {total_files} files to process") # 创建工作队列 queue = asyncio.Queue() # 将所有文件放入队列 for file_path in file_list: await queue.put(file_path) # 创建工作协程 async def worker(worker_id: int): while True: try: # 非阻塞方式获取任务 input_file_path = await queue.get() logging.info(f"Worker {worker_id} processing: {input_file_path}") try: await self.process_single_file(input_file_path) except Exception as e: logging.error(f"Error processing {input_file_path}: {str(e)}") finally: queue.task_done() except asyncio.QueueEmpty: # 队列为空,工作结束 break except Exception as e: logging.error(f"Worker {worker_id} encountered error: {str(e)}") break # 创建工作任务 workers = [] for i in range(self.max_concurrent): worker_task = asyncio.create_task(worker(i)) workers.append(worker_task) # 等待队列处理完成 await queue.join() # 取消所有仍在运行的工作任务 for w in workers: w.cancel() # 等待所有工作任务完成 await asyncio.gather(*workers, return_exceptions=True)
3. 异步批处理入口函数
最后,我们定义一个异步批处理入口函数 batch_detect
,用于启动批处理任务。
async def batch_detect( file_list: list, max_concurrent: int = DEFAULT_MAX_CONCURRENT_TASKS ): """异步批处理入口函数""" processor = AsyncBatchProcessor(max_concurrent) await processor.process_batch(file_list) # 示例调用 file_list = ["file1.pdf", "file2.pdf", "file3.pdf", "file4.pdf"] asyncio.run(batch_detect(file_list))
代码解释
1.日志配置:
- 使用
logging
模块记录日志信息,包括时间、日志级别、文件路径和行号、以及日志消息。 - 日志输出到文件
app.log
中,便于后续查看和分析。
2.异步批处理类 AsyncBatchProcessor
:
__init__
方法初始化最大并发任务数和信号量。process_single_file
方法处理单个文件,使用信号量控制并发,模拟文件处理过程,并在失败时重试。process_batch
方法处理批量文件,创建工作队列和协程,控制并发任务的执行。
3.异步批处理入口函数 batch_detect
:
- 创建
AsyncBatchProcessor
实例,并调用process_batch
方法启动批处理任务。
总结
通过使用 asyncio
和 logging
模块,我们实现了一个高效的异步批处理文件系统。
该系统能够并发处理大量文件,并在处理失败时自动重试,直到达到最大重试次数。
日志系统帮助我们记录每个文件的处理过程,便于后续的调试和分析。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。