Python多线程如何同时处理多个文件
作者:DS..
这篇文章主要介绍了Python多线程如何同时处理多个文件问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
Python多线程同时处理多个文件
在需要对大量文件进行相同的操作时,逐个遍历是非常耗费时间的。这时,我们可以借助于Python的多线程操作来大大提高处理效率,减少处理时间。
问题背景
比如说,我们现在需要从一个文件夹下面读取出所有的视频,然后对每个视频进行逐帧处理。
由于对视频逐帧处理本身就是比较耗时的任务,如果按照串行的方式顺序处理每个视频文件是效率非常低的,此时,一种比较容易的解决方案是使用Python的多线程进行处理。
定义通用处理函数
并发适合于处理相似的任务。例如我们需要对视频中的每一帧进行处理,处理函数接受的是一个视频名称的列表,对列表内的视频顺序处理,那么我们可以构建以下处理函数:
import cv2 def func(video_names): for video_name in video_names: cap = cv2.VideoCapture(video_name) while True: ret, frame = cap.read() if ret: # process temp frame else: break
这样,我们只需要将待处理的视频名称按照开启的线程数,划分成多个子列表,就可以进行并发批量处理了。
多线程 Thread
Python中的多线程是通过threading库实现的,除此之外,multiprocessing也可以实现并发处理,二者之间是存在一定差异的。这里我们使用threading来实现同时处理多个不同的文件。
import threading # video_names_list = [part_names_1_list, part_names_2_list, ..., part_names_k_list] for part_video in video_names_list: thread = threading.Thread(target=func, args=([part_video])) thread.start()
在这里,首先将要处理的文件名称列表划分成若干个子列表,然后对每一个子列表开启一个线程进行处理。
Python多线程文件操作
使用python 将在csv文件中的一百万条网址,写入mongo数据库中,这里使用多线程进行操作。
直接贴上代码,如下:
import os import threading #导入进程 import csv import time from Mongo_cache import MongoCache import win32com.client import winsound NUM_THREAD = 5 COUNT = 0 lock = threading.Lock() cache = MongoCache() #数据库连接初始化 def worker(): """ func: 从csv文件中读取数据,并将数据返回 """ for path in os.listdir(os.getcwd()): #print("当前工作目录", path) file_name = path.split('.') #print(file_name) if file_name[-1] == 'csv': #print("地址是:",path) file = open(path) data = csv.reader(file) return data else: pass def save_info(data,i, num_retries=2): """ func: 将数据保存 """ global COUNT global lock global cache for _, website in data: try: lock.acquire() #print("线程{}》》》{}正在运行".format(threading.current_thread().name, i)) item = {'website':website} cache(item) COUNT += 1 except: if num_retries > 0: save_info(data, i, num_retries-1) finally: lock.release() def main(): """ 启动线程 """ print("start working") print("working...") data = worker() threads = [] #设置主线程 for i in range(NUM_THREAD): t = threading.Thread(target=save_info, args=(data, i)) threads.append(t) for i in range(NUM_THREAD): threads[i].start() for i in range(NUM_THREAD): threads[i].join() print("all was done!") if __name__ == '__main__': s_time = time.time() main() e_time = time.time() print("总的信息条数:", COUNT) print("总耗时:", e_time-s_time) speak = win32com.client.Dispatch('SAPI.SPVOICE') speak.Speak("早上好,eric,程序结束!")
数据存储模块
import pickle import zlib from bson.binary import Binary from datetime import datetime, timedelta from pymongo import MongoClient import time class MongoCache(object): def __init__(self, client=None, expires=timedelta(days=30)): self.client = MongoClient('localhost', 27017) if client is None else client self.db = self.client.cache #self.db.webpage.create_index('timestamp', expireAfterSeconds=expires.total_seconds()) #设置自动删除时间 def __call__(self,url): self.db.webpage.insert(url) #print("保存成功") def __contains__(self,url): try: self[url] except KeyError: return False else: return True def __getitem__(self, url): record = self.db.webpage.find_one({'_id':url}) if record: return pickle.loads(zlib.decompress(record['result'])) else: raise KeyError(url + 'dose not exist') def __setitem__(self, url, result): record = {'result': Binary(zlib.compress(pickle.dumps(result))), 'timestamp':datetime.utcnow()} self.db.webpage.update({'_id':url},{'$set':record},upsert=True) def clear(self): self.db.webpage.drop()
将一百万条网址从csv文件保存到数据库所花费的时间为
start working working... all was done! 总的信息条数: 1000000 总耗时: 427.4034459590912
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。