python异步IO的项目实践
作者:快乐江小鱼
asyncio是python3.4引入的标准库,内置对异步IO的支持。asyncio的编程模型是一个消息循环,从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程放入EventLoop中执行,就实现了异步IO。
协程又称为微线程。子程序在所有语言中都是层级调用,子程序调用通过栈实现。一个线程就是执行一个子程序。协程最大的优势是执行效率高,子程序切换不是线程切换,而是由程序自身控制。第二个优势是不需要多线程的锁机制。python对协程的支持通过generator实现,generator中,可以通过for循环来迭代,也可以不断调用next()函数获取由yield语句返回的下一个值。
协程示例,@asyncio.coroutine把一个生成器标记为coroutine类型,自python3.8弃用,使用async def替代。
import threading
import asyncio
@asyncio.coroutine
def hello():
print('Hello World! (%s)' % threading.current_thread())
# yield from调用另一个生成器
r = yield from asyncio.sleep(1)
print('Hello Again! (%s)' % threading.current_thread())
loop = asyncio.get_event_loop()
# 两个协程是由同一个线程并发执行的
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()结果:
Hello World! (<_MainThread(MainThread, started 49300)>)
Hello World! (<_MainThread(MainThread, started 49300)>)
Hello again! (<_MainThread(MainThread, started 49300)>)
Hello again! (<_MainThread(MainThread, started 49300)>)
推荐使用async/await语法编写协程应用。
# 直接调用main()并不会执行协程应用。
>>> import asyncio
>>> async def main():
... print('hello')
... await asyncio.sleep(1)
... print('world')
>>> asyncio.run(main())
hello
world传统生产消费模型,一个线程写消息,一个线程读消息,通过锁机制控制队列和等待,但一不小心就可能死锁。改用协程生产消息后直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者,效率极高。
# consumer函数是一个生成器
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[COUSUMER] Consuming %s...' % n)
r = '200 OK'
def produce(c):
# 启动生成器
c.send(None)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return:%s' % r)
c.close()
# 注意到consumer函数是一个generator,把一个consumer传入produce后:
# 首先调用c.send(None)启动生成器;
# 然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
# consumer通过yield拿到消息处理,又通过yield把结果传回;
# produce拿到consumer处理的结果,继续生产下一条消息;
# produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
# 整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
c = consumer()
produce(c)asyncio可以实现单线程并发IO操作,用单线程+coroutine实现多用户的高并发支持,asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架。
import asyncio
from aiohttp import web
def index(request):
return web.Response(body=b'<h1>Index</h1>')
def hello(request):
yield from asyncio.sleep(0.5)
text = '<h1>Hello, %s!</h1>' % request.match_info['name']
return web.Response(body=text.encode('utf-8'))
@asyncio.coroutine
def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
app.router.add_router('GET', '/hello/{name}', hello)
srv = yield from loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000')
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()到此这篇关于python异步IO的项目实践的文章就介绍到这了,更多相关python异步IO内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
