python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python多线程同时处理多文件

Python多线程如何同时处理多个文件

作者:DS..

这篇文章主要介绍了Python多线程如何同时处理多个文件问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

Python多线程同时处理多个文件

在需要对大量文件进行相同的操作时,逐个遍历是非常耗费时间的。这时,我们可以借助于Python的多线程操作来大大提高处理效率,减少处理时间。

问题背景

比如说,我们现在需要从一个文件夹下面读取出所有的视频,然后对每个视频进行逐帧处理。

由于对视频逐帧处理本身就是比较耗时的任务,如果按照串行的方式顺序处理每个视频文件是效率非常低的,此时,一种比较容易的解决方案是使用Python的多线程进行处理。

定义通用处理函数

并发适合于处理相似的任务。例如我们需要对视频中的每一帧进行处理,处理函数接受的是一个视频名称的列表,对列表内的视频顺序处理,那么我们可以构建以下处理函数:

import cv2
def func(video_names):
    for video_name in video_names:
        cap = cv2.VideoCapture(video_name)
        while True:
            ret, frame = cap.read()
            if ret:
                # process temp frame
            else:
                break

这样,我们只需要将待处理的视频名称按照开启的线程数,划分成多个子列表,就可以进行并发批量处理了。

多线程 Thread

Python中的多线程是通过threading库实现的,除此之外,multiprocessing也可以实现并发处理,二者之间是存在一定差异的。这里我们使用threading来实现同时处理多个不同的文件。

import threading
# video_names_list = [part_names_1_list, part_names_2_list, ..., part_names_k_list]
for part_video in video_names_list:
    thread = threading.Thread(target=func, args=([part_video]))
    thread.start()

在这里,首先将要处理的文件名称列表划分成若干个子列表,然后对每一个子列表开启一个线程进行处理。

Python多线程文件操作

使用python 将在csv文件中的一百万条网址,写入mongo数据库中,这里使用多线程进行操作。

直接贴上代码,如下:

import os
import threading  #导入进程
import csv
import time
from Mongo_cache import MongoCache 
import win32com.client
import winsound
NUM_THREAD = 5
COUNT = 0
lock = threading.Lock()
cache = MongoCache()     #数据库连接初始化
def worker():
"""
func: 从csv文件中读取数据,并将数据返回
"""
    for path in os.listdir(os.getcwd()):
        #print("当前工作目录", path)
        file_name = path.split('.')
        #print(file_name)
        if file_name[-1] == 'csv':
            #print("地址是:",path)
            file = open(path)
            data = csv.reader(file)
            return data
        else:
            pass
def save_info(data,i, num_retries=2):
"""
func: 将数据保存
"""
    global COUNT
    global lock
    global cache
    for _, website in data:
        try:
            lock.acquire()
            #print("线程{}》》》{}正在运行".format(threading.current_thread().name, i))
            item = {'website':website}
            cache(item)
            COUNT += 1
        except:
            if num_retries > 0:
                save_info(data, i, num_retries-1)
        finally:
            lock.release()
def main():
"""
启动线程
"""
    print("start working")
    print("working...")
    data = worker()
    threads = []   #设置主线程
    for i in range(NUM_THREAD):
        t = threading.Thread(target=save_info, args=(data, i))
        threads.append(t)
    for i in range(NUM_THREAD):
        threads[i].start()
    for i in range(NUM_THREAD):
        threads[i].join()
    print("all was done!")
if __name__ == '__main__':
    s_time = time.time()
    main()
    e_time = time.time()
    print("总的信息条数:", COUNT)
    print("总耗时:", e_time-s_time)
    speak = win32com.client.Dispatch('SAPI.SPVOICE')
    speak.Speak("早上好,eric,程序结束!")

数据存储模块

import pickle
import zlib
from bson.binary import Binary 
from datetime import datetime, timedelta
from pymongo import MongoClient
import time
class MongoCache(object):
    def __init__(self, client=None, expires=timedelta(days=30)):
        self.client = MongoClient('localhost', 27017) if client is None else client
        self.db = self.client.cache
        #self.db.webpage.create_index('timestamp', expireAfterSeconds=expires.total_seconds())  #设置自动删除时间
    def __call__(self,url):
        self.db.webpage.insert(url)
        #print("保存成功")
    def __contains__(self,url):
        try:
            self[url]
        except KeyError:
            return False
        else:
            return True
    def __getitem__(self, url):
        record = self.db.webpage.find_one({'_id':url})
        if record:
            return pickle.loads(zlib.decompress(record['result']))
        else:
            raise KeyError(url + 'dose not exist')
    def __setitem__(self, url, result):
        record = {'result': Binary(zlib.compress(pickle.dumps(result))), 'timestamp':datetime.utcnow()}
        self.db.webpage.update({'_id':url},{'$set':record},upsert=True)
    def clear(self):
        self.db.webpage.drop()

将一百万条网址从csv文件保存到数据库所花费的时间为

start working
working...
all was done!
总的信息条数: 1000000
总耗时: 427.4034459590912

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文