python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Pipeline处理CPU密集型或阻塞型操作

Scrapy的Pipeline之处理CPU密集型或阻塞型操作详解

作者:bluespacezero

这篇文章主要介绍了Scrapy的Pipeline之处理CPU密集型或阻塞型操作详解,Twisted框架的reactor适合于处理短的、非阻塞的操作,Twisted提供了线程池来在其他的线程而不是主线程(Twisted的reactor线程)中执行慢的操作,需要的朋友可以参考下

Pipeline处理CPU密集型或阻塞型操作

Twisted框架的reactor适合于处理短的、非阻塞的操作。但是如果要处理一些复杂的、或者包含阻塞的操作又该怎么办呢?Twisted提供了线程池来在其他的线程而不是主线程(Twisted的reactor线程)中执行慢的操作——使用reactor.callInThread() API。这就意味着reactor在执行计算时还能保持运行并对事件做出反应。一定要记住线程池中的处理不是线程安全的。这就意味着当你使用了全局的状态之后,还要面临所有那些传统的多线程编程的同步问题。下面是一个简单的例子:

class UsingBlocking(object):
    @defer.inlineCallbacks
    def process_item(self, item, spider):
        price = item["price"][0]

        out = defer.Deferred()
        reactor.callInThread(self._do_calculation, price, out)

        item["price"][0] = yield out

        defer.returnValue(item)

    def _do_calculation(self, price, out):
        new_price = price + 1
        time.sleep(0.10)
        reactor.callFromThread(out.callback, new_price)

在上面的Pipeline中,对于每个Item,我们提取出它的price字段,想要在_do_caculation()方法中对它进行处理。这个方法使用了time.sleep(),一个阻塞的操作。我们调用reactor.callInThread()方法使它运行在另一个线程中,该方法的第一个参数是想要调用的函数,后面的参数则会全部传递给被调用的函数作为参数。在这里我们给被调用的函数传递了price,还有一个创建的Deferred对象out。当_do_caculation()函数完成计算后,我们会使用out的回调函数来返回这个值。接下来,yield这个 Deferred对象并为price设置一个新的值,最后返回Item。

在_do_caculation()函数中我们把price加一,然后休眠了100ms。其实这个时间是很长的,如果在reactor的线程中调用这个函数,那就意味着我们每秒只能处理不超过10个页面。不过如果把它放在另一个线程中来调用就不会出现这种问题了。这些计算任务会在线程池中排队,等待某个线程处于可用状态,然后这个线程就会执行这个任务,休眠100ms。最后一步是激活out的回调函数。通常情况下,我们可以这样来激活:out.callback(new_price),但是既然现在我们处于另外一个线程中,这样做就不安全了。如果我们执意这样做了,这个Deferred对象的代码,也就是Scrapy的功能就会在别的线程中执行,这样会导致数据被损坏。所以我们调用了reactor.callFromThread()函数,同样的,它也是以一个函数作为参数,并把额外的参数直接传递给被调用的函数。这个函数会在主线程中排队并等待被调用,它反过来解锁了process_item()方法中的yield语句,并恢复Scrapy对这个Item的操作。

如果我们的pipeline中含有全局状态会怎么样呢?比如,计数器或者平均值等,我们需要在_do_caculation()函数中使用的。例如有以下两个变量,beta和delta:

class UsingBlocking(object):
    def __init__(self):
        self.beta, self.delta = 0, 0
        ...

    def _do_calculation(self, price, out):
        self.beta += 1
        time.sleep(0.001)
        self.delta += 1
        new_price = price + self.beta - self.delta + 1
        assert abs(new_price-price-1) < 0.01

        time.sleep(0.10)...

上面的代码有一些问题,并且在运行的时候会给出assertion错误。这是因为,如果一个线程在self.beta += 1和self.delta += 1语句之间切换的话,另一个线程就会恢复执行并使用beta和delta的值来计算price,这里线程会发现这两个值处于不一致的状态(beta比delta大),这样,错误的产生了。中间短的sleep会让线程切换更可能发生,不过即使没有它,同样也会出现竞态条件。为了阻止竞态条件的发生,我们必须使用锁,例如Python的threading.RLock()锁。使用了这个递归锁,就能确保两个线程不会同时执行锁保护的临界区的代码:

class UsingBlocking(object):
    def __init__(self):
        ...
        self.lock = threading.RLock()
        ...

    def _do_calculation(self, price, out):
        with self.lock:
            self.beta += 1
            ...
            new_price = price + self.beta - self.delta + 1
        assert abs(new_price-price-1) < 0.01 ...

现在的代码就没问题了,要注意的是,我们不需要保护整个代码,只需要能够覆盖全局状态的使用就行了。

在ITEM_PIPELINES中加上:

ITEM_PIPELINES = { ...
    'properties.pipelines.computation.UsingBlocking': 500,
}

运行一下会发现,时延由于100ms的休眠的缘故变调了,不过吞吐量还是保持不变,大约每秒25个。

到此这篇关于Scrapy的Pipeline之处理CPU密集型或阻塞型操作详解的文章就介绍到这了,更多相关Pipeline处理CPU密集型或阻塞型操作内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文