python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python中的Joblib库

Python中的Joblib库使用学习总结

作者:酒酿小小丸子

这篇文章主要介绍了Python中的Joblib库使用学习总结,Joblib是一组在Python中提供轻量级流水线的工具,Joblib已被优化得很快速,很健壮了,特别是在大数据上,并对numpy数组进行了特定的优化,需要的朋友可以参考下

Joblib

实践环境

简介

Joblib是一组在Python中提供轻量级流水线的工具。特别是:

Joblib已被优化得很快速,很健壮了,特别是在大数据上,并对numpy数组进行了特定的优化。

主要功能

1.输出值的透明快速磁盘缓存(Transparent and fast disk-caching of output value):

Python函数的内存化或类似make的功能,适用于任意Python对象,包括非常大的numpy数组。

通过将操作写成一组具有定义良好的输入和输出的步骤:Python函数,将持久性和流执行逻辑与域逻辑或算法代码分离开来。

Joblib可以将其计算保存到磁盘上,并仅在必要时重新运行:

>>> from joblib import Memory
>>> cachedir = 'your_cache_dir_goes_here'
>>> mem = Memory(cachedir)
>>> import numpy as np
>>> a = np.vander(np.arange(3)).astype(float)
>>> square = mem.cache(np.square)
>>> b = square(a)                                   
______________________________________________________________________...
[Memory] Calling square...
square(array([[0., 0., 1.],
       [1., 1., 1.],
       [4., 2., 1.]]))
_________________________________________________...square - ...s, 0.0min
>>> c = square(a)
# The above call did not trigger an evaluation

2.并行助手(parallel helper):

轻松编写可读的并行代码并快速调试

>>> from joblib import Parallel, delayed
>>> from math import sqrt
>>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
>>> res = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10))
>>> res
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

3.快速压缩的持久化(Fast compressed Persistence):

代替pickle在包含大数据的Python对象上高效工作( joblib.dump & joblib.load )。

parallel for loops

常见用法

Joblib提供了一个简单的助手类,用于使用多进程为循环实现并行。核心思想是将要执行的代码编写为生成器表达式,并将其转换为并行计算

>>> from math import sqrt
>>> [sqrt(i ** 2) for i in range(10)]
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

使用以下代码,可以分布到2个CPU上:

>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

输出可以是一个生成器,在可以获取结果时立即返回结果,即使后续任务尚未完成。输出的顺序始终与输入的顺序相匹配:输出的顺序总是匹配输入的顺序:

>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> parallel = Parallel(n_jobs=2, return_generator=True) # py3.7往后版本才支持return_generator参数
>>> output_generator = parallel(delayed(sqrt)(i ** 2) for i in range(10))
>>> print(type(output_generator))
<class 'generator'>
>>> print(next(output_generator))
0.0
>>> print(next(output_generator))
1.0
>>> print(list(output_generator))
[2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

此生成器允许减少joblib.Parallel的内存占用调用

基于线程的并行VS基于进程的并行

默认情况下, joblib.Parallel 使用 'loky' 后端模块启动单独的Python工作进程,以便在分散的CPU上同时执行任务。

对于一般的Python程序来说,这是一个合理的默认值,但由于输入和输出数据需要在队列中序列化以便同工作进程进行通信,因此可能会导致大量开销(请参阅序列化和进程)。

当你知道你调用的函数是基于一个已编译的扩展,并且该扩展在大部分计算过程中释放了Python全局解释器锁(GIL)时,使用线程而不是Python进程作为并发工作者会更有效。

例如,在Cython函数的with nogil 块中编写CPU密集型代码。

如果希望代码有效地使用线程,只需传递 preferre='threads' 作为 joblib.Parallel 构造函数的参数即可。在这种情况下,joblib将自动使用 "threading" 后端,而不是默认的 "loky" 后端

>>> Parallel(n_jobs=2, prefer=threads')(
...     delayed(sqrt)(i ** 2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

也可以在上下文管理器的帮助下手动选择特定的后端实现:

>>> from joblib import parallel_backend
>>> with parallel_backend('threading', n_jobs=2):
...    Parallel()(delayed(sqrt)(i ** 2) for i in range(10))
...
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

后者在调用内部使用 joblib.Parallel 的库时特别有用,不会将后端部分作为其公共API的一部分公开。

'loky' 后端可能并不总是可获取。

一些罕见的系统不支持多处理(例如Pyodide)。在这种情况下,loky后端不可用,使用线程作为默认后端。

除了内置的joblib后端之外,还可以使用几个特定于集群的后端:

序列化与进程

要在多个python进程之间共享函数定义,必须依赖序列化协议。python中的标准协议是 pickle ,但它在标准库中的默认实现有几个限制。例如,它不能序列化交互式定义的函数或在 __main__ 模块中定义的函数。

为了避免这种限制, loky 后端现在依赖于 cloudpickle 以序列化python对象。 cloudpickle 是 pickle 协议的另一种实现方式,允许序列化更多的对象,特别是交互式定义的函数。因此,对于大多数用途, loky 后端应该可以完美的工作。

cloudpickle 的主要缺点就是它可能比标准类库中的 pickle 慢,特别是,对于大型python字典或列表来说,这一点至关重要,因为它们的序列化时间可能慢100倍。有两种方法可以更改 joblib 的序列化过程以缓和此问题:

共享内存语义

joblib的默认后端将在独立的Python进程中运行每个函数调用,因此它们不能更改主程序中定义的公共Python对象。

然而,如果并行函数确实需要依赖于线程的共享内存语义,则应显示的使用 require='sharemem' ,例如:

>>> shared_set = set()
>>> def collect(x):
...    shared_set.add(x)
...
>>> Parallel(n_jobs=2, require='sharedmem')(
...     delayed(collect)(i) for i in range(5))
[None, None, None, None, None]
>>> sorted(shared_set)
[0, 1, 2, 3, 4]

请记住,从性能的角度来看,依赖共享内存语义可能是次优的,因为对共享Python对象的并发访问将受到锁争用的影响。

注意,不使用共享内存的情况下,任务进程之间的内存资源是相互独立的,举例说明如下:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
from joblib import Parallel, delayed, parallel_backend
from collections import deque
GLOBAL_LIST = []
class TestClass():
    def __init__(self):
        self.job_queue = deque()
    def add_jobs(self):
        i = 0
        while i < 3:
            time.sleep(1)
            i += 1
            GLOBAL_LIST.append(i)
            self.job_queue.append(i)
            print('obj_id:', id(self),  'job_queue:', self.job_queue, 'global_list:', GLOBAL_LIST)
def get_job_queue_list(obj):
    i = 0
    while not obj.job_queue and i < 3:
        time.sleep(1)
        i += 1
        print('obj_id:', id(obj), 'job_queue:', obj.job_queue, 'global_list:', GLOBAL_LIST)
    return obj.job_queue
if __name__ == "__main__":
    obj = TestClass()
    def test_fun():
        with parallel_backend("multiprocessing", n_jobs=2):
            Parallel()(delayed(get_job_queue_list)(obj) for i in range(2))
    thread = threading.Thread(target=test_fun, name="parse_log")
    thread.start()
    time.sleep(1)
    obj.add_jobs()
    print('global_list_len:', len(GLOBAL_LIST))

控制台输出:

obj_id: 1554577912664 job_queue: deque([]) global_list: []
obj_id: 1930069893920 job_queue: deque([]) global_list: []
obj_id: 2378500766968 job_queue: deque([1]) global_list: [1]
obj_id: 1554577912664 job_queue: deque([]) global_list: []
obj_id: 1930069893920 job_queue: deque([]) global_list: []
obj_id: 2378500766968 job_queue: deque([1, 2]) global_list: [1, 2]
obj_id: 1554577912664 job_queue: deque([]) global_list: []
obj_id: 1930069893920 job_queue: deque([]) global_list: []
obj_id: 2378500766968 job_queue: deque([1, 2, 3]) global_list: [1, 2, 3]
global_list_len: 3

通过输出可知,通过joblib.Parallel开启的进程,其占用内存和主线程占用的内存资源是相互独立

复用worer池

一些算法需要对并行函数进行多次连续调用,同时对中间结果进行处理。在一个循环中多次调用 joblib.Parallel 次优的,因为它会多次创建和销毁一个workde(线程或进程)池,这可能会导致大量开销。

在这种情况下,使用 joblib.Parallel 类的上下文管理器API更有效,以便对 joblib.Parallel 对象的多次调用可以复用同一worker池。

from joblib import Parallel, delayed
from math import sqrt
with Parallel(n_jobs=2) as parallel:
   accumulator = 0.
   n_iter = 0
   while accumulator < 1000:
       results = parallel(delayed(sqrt)(accumulator + i ** 2) for i in range(5))
       accumulator += sum(results)  # synchronization barrier
       n_iter += 1
print(accumulator, n_iter)  #输出: 1136.5969161564717 14                          

请注意,现在基于进程的并行默认使用 'loky' 后端,该后端会自动尝试自己维护和重用worker池,即使是在没有上下文管理器的调用中也是如此

笔者实践发现,即便采用这种实现方式,其运行效率也是非常低下的,应该尽量避免这种设计(实践环境 Python3.6)

Parallel参考文档

class joblib.Parallel(n_jobs=default(None), backend=None, return_generator=False, verbose=default(0), timeout=None, pre_dispatch='2 * n_jobs', batch_size='auto', temp_folder=default(None), max_nbytes=default('1M'), mmap_mode=default('r'), prefer=default(None), require=default(None))

常用参数说明

并发运行作业的最大数量,例如当 backend='multiprocessing' 时Python工作进程的数量,或者当 backend='threading' 时线程池的大小。如果设置为 -1,则使用所有CPU。如果设置为1,则根本不使用并行计算代码,并且行为相当于一个简单的python for循环。此模式与 timeout 不兼容。如果 n_jobs 小于-1,则使用 (n_cpus+1+n_jobs) 。因此,如果 n_jobs=-2 ,将使用除一个CPU之外的所有CPU。如果为 None ,则默认 n_jobs=1 ,除非在 parallel_backend() 上下文管理器下执行调用,此时会为 n_jobs 设置另一个值。

指定并行化后端实现。支持的后端有:

不建议在类库中调用 Parallel 时对 backend 名称进行硬编码,取而代之,建议设置软提示( prefer )或硬约束( require ),以便库用户可以使用 parallel_backend() 上下文管理器从外部更改 backend 。

如果为 True ,则对此实例的调用将返回一个生成器,并在结果可获取时立即按原始顺序返回结果。请注意,预期用途是一次运行一个调用。对同一个Parallel对象的多次调用将导致 RuntimeError

如果使用 parallel_backen() 上下文管理器时没有指定特定后端,则选择默认 prefer 给定值。默认的基于进程的后端是 loky ,而默认的基于线程的后端则是 threading 。如果指定了 backend 参数,则忽略该参数。

用于选择后端的硬约束。如果设置为 'sharedmem' ,则所选后端将是单主机和基于线程的,即使用户要求使用具有 parallel_backend 的非基于线程的后端。

到此这篇关于Python中的Joblib库使用学习总结的文章就介绍到这了,更多相关Python中的Joblib库内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文