Python多线程如何同时处理多个文件
作者:DS..
这篇文章主要介绍了Python多线程如何同时处理多个文件问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
Python多线程同时处理多个文件
在需要对大量文件进行相同的操作时,逐个遍历是非常耗费时间的。这时,我们可以借助于Python的多线程操作来大大提高处理效率,减少处理时间。
问题背景
比如说,我们现在需要从一个文件夹下面读取出所有的视频,然后对每个视频进行逐帧处理。
由于对视频逐帧处理本身就是比较耗时的任务,如果按照串行的方式顺序处理每个视频文件是效率非常低的,此时,一种比较容易的解决方案是使用Python的多线程进行处理。
定义通用处理函数
并发适合于处理相似的任务。例如我们需要对视频中的每一帧进行处理,处理函数接受的是一个视频名称的列表,对列表内的视频顺序处理,那么我们可以构建以下处理函数:
import cv2 def func(video_names): for video_name in video_names: cap = cv2.VideoCapture(video_name) while True: ret, frame = cap.read() if ret: # process temp frame else: break
这样,我们只需要将待处理的视频名称按照开启的线程数,划分成多个子列表,就可以进行并发批量处理了。
多线程 Thread
Python中的多线程是通过threading库实现的,除此之外,multiprocessing也可以实现并发处理,二者之间是存在一定差异的。这里我们使用threading来实现同时处理多个不同的文件。
import threading # video_names_list = [part_names_1_list, part_names_2_list, ..., part_names_k_list] for part_video in video_names_list: thread = threading.Thread(target=func, args=([part_video])) thread.start()
在这里,首先将要处理的文件名称列表划分成若干个子列表,然后对每一个子列表开启一个线程进行处理。
Python多线程文件操作
使用python 将在csv文件中的一百万条网址,写入mongo数据库中,这里使用多线程进行操作。
直接贴上代码,如下:
import os
import threading #导入进程
import csv
import time
from Mongo_cache import MongoCache
import win32com.client
import winsound
NUM_THREAD = 5
COUNT = 0
lock = threading.Lock()
cache = MongoCache() #数据库连接初始化
def worker():
"""
func: 从csv文件中读取数据,并将数据返回
"""
for path in os.listdir(os.getcwd()):
#print("当前工作目录", path)
file_name = path.split('.')
#print(file_name)
if file_name[-1] == 'csv':
#print("地址是:",path)
file = open(path)
data = csv.reader(file)
return data
else:
pass
def save_info(data,i, num_retries=2):
"""
func: 将数据保存
"""
global COUNT
global lock
global cache
for _, website in data:
try:
lock.acquire()
#print("线程{}》》》{}正在运行".format(threading.current_thread().name, i))
item = {'website':website}
cache(item)
COUNT += 1
except:
if num_retries > 0:
save_info(data, i, num_retries-1)
finally:
lock.release()
def main():
"""
启动线程
"""
print("start working")
print("working...")
data = worker()
threads = [] #设置主线程
for i in range(NUM_THREAD):
t = threading.Thread(target=save_info, args=(data, i))
threads.append(t)
for i in range(NUM_THREAD):
threads[i].start()
for i in range(NUM_THREAD):
threads[i].join()
print("all was done!")
if __name__ == '__main__':
s_time = time.time()
main()
e_time = time.time()
print("总的信息条数:", COUNT)
print("总耗时:", e_time-s_time)
speak = win32com.client.Dispatch('SAPI.SPVOICE')
speak.Speak("早上好,eric,程序结束!")数据存储模块
import pickle
import zlib
from bson.binary import Binary
from datetime import datetime, timedelta
from pymongo import MongoClient
import time
class MongoCache(object):
def __init__(self, client=None, expires=timedelta(days=30)):
self.client = MongoClient('localhost', 27017) if client is None else client
self.db = self.client.cache
#self.db.webpage.create_index('timestamp', expireAfterSeconds=expires.total_seconds()) #设置自动删除时间
def __call__(self,url):
self.db.webpage.insert(url)
#print("保存成功")
def __contains__(self,url):
try:
self[url]
except KeyError:
return False
else:
return True
def __getitem__(self, url):
record = self.db.webpage.find_one({'_id':url})
if record:
return pickle.loads(zlib.decompress(record['result']))
else:
raise KeyError(url + 'dose not exist')
def __setitem__(self, url, result):
record = {'result': Binary(zlib.compress(pickle.dumps(result))), 'timestamp':datetime.utcnow()}
self.db.webpage.update({'_id':url},{'$set':record},upsert=True)
def clear(self):
self.db.webpage.drop()将一百万条网址从csv文件保存到数据库所花费的时间为
start working working... all was done! 总的信息条数: 1000000 总耗时: 427.4034459590912
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
