Python异步库asyncio、aiohttp详解
作者:IT.BOB
这篇文章主要介绍了Python异步库asyncio、aiohttp使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
asyncio
版本支持
- asyncio 模块在 Python3.4 时发布。
- async 和 await 关键字最早在 Python3.5 中引入。
- Python3.3 之前不支持。
关键概念
event_loop
事件循环:程序开启一个无限的循环,程序员会把一些函数(协程)注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。coroutine
协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。future
对象: 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别task
任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。Task 对象是 Future 的子类,它将 coroutine 和 Future 联系在一起,将 coroutine 封装成一个 Future 对象。async/await
关键字:python3.5 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。其作用在一定程度上类似于yield。
工作流程
- 定义/创建协程对象
- 将协程转为task任务
- 定义事件循环对象容器
- 将task任务放到事件循环对象中触发
import asyncio async def hello(name): print('Hello,', name) # 定义协程对象 coroutine = hello("World") # 定义事件循环对象容器 loop = asyncio.get_event_loop() # 将协程转为task任务 # task = asyncio.ensure_future(coroutine) task = loop.create_task(coroutine) # 将task任务扔进事件循环对象中并触发 loop.run_until_complete(task)
并发
1. 创建多个协程的列表 tasks:
import asyncio async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) tasks = [do_some_work(1), do_some_work(2), do_some_work(4)]
2. 将协程注册到事件循环中:
- 方法一:使用
asyncio.wait()
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
- 方法二:使用
asyncio.gather()
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks))
3. 查看 return 结果:
for task in tasks: print('Task ret: ', task.result())
4. asyncio.wait()
与 asyncio.gather()
的区别:
接收参数不同:
asyncio.wait()
:必须是一个 list 对象,list 对象里存放多个 task 任务。
# 使用 asyncio.ensure_future 转换为 task 对象 tasks=[ asyncio.ensure_future(factorial("A", 2)), asyncio.ensure_future(factorial("B", 3)), asyncio.ensure_future(factorial("C", 4)) ] # 也可以不转为 task 对象 # tasks=[ # factorial("A", 2), # factorial("B", 3), # factorial("C", 4) # ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
asyncio.gather()
:比较广泛,注意接收 list 对象时*
不能省略。
tasks=[ asyncio.ensure_future(factorial("A", 2)), asyncio.ensure_future(factorial("B", 3)), asyncio.ensure_future(factorial("C", 4)) ] # tasks=[ # factorial("A", 2), # factorial("B", 3), # factorial("C", 4) # ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks))
loop = asyncio.get_event_loop() group1 = asyncio.gather(*[factorial("A" ,i) for i in range(1, 3)]) group2 = asyncio.gather(*[factorial("B", i) for i in range(1, 5)]) group3 = asyncio.gather(*[factorial("B", i) for i in range(1, 7)]) loop.run_until_complete(asyncio.gather(group1, group2, group3))
返回结果不同:
asyncio.wait()
:返回dones
(已完成任务) 和pendings
(未完成任务)
dones, pendings = await asyncio.wait(tasks) for task in dones: print('Task ret: ', task.result())
asyncio.gather()
:直接返回结果
results = await asyncio.gather(*tasks) for result in results: print('Task ret: ', result)
aiohttp
ClientSession 会话管理
import aiohttp import asyncio async def main(): async with aiohttp.ClientSession() as session: async with session.get('http://httpbin.org/get') as resp: print(resp.status) print(await resp.text()) asyncio.run(main())
其他请求:
session.post('http://httpbin.org/post', data=b'data') session.put('http://httpbin.org/put', data=b'data') session.delete('http://httpbin.org/delete') session.head('http://httpbin.org/get') session.options('http://httpbin.org/get') session.patch('http://httpbin.org/patch', data=b'data')
URL 参数传递
async def main(): async with aiohttp.ClientSession() as session: params = {'key1': 'value1', 'key2': 'value2'} async with session.get('http://httpbin.org/get', params=params) as r: expect = 'http://httpbin.org/get?key1=value1&key2=value2' assert str(r.url) == expect
async def main(): async with aiohttp.ClientSession() as session: params = [('key', 'value1'), ('key', 'value2')] async with session.get('http://httpbin.org/get', params=params) as r: expect = 'http://httpbin.org/get?key=value2&key=value1' assert str(r.url) == expect
获取响应内容
async def main(): async with aiohttp.ClientSession() as session: async with session.get('http://httpbin.org/get') as r: # 状态码 print(r.status) # 响应内容,可以自定义编码 print(await r.text(encoding='utf-8')) # 非文本内容 print(await r.read()) # JSON 内容 print(await r.json())
自定义请求头
headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36" } async def main(): async with aiohttp.ClientSession() as session: async with session.get('http://httpbin.org/get', headers=headers) as r: print(r.status)
为所有会话设置请求头:
headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36" } async def main(): async with aiohttp.ClientSession(headers=headers) as session: async with session.get('http://httpbin.org/get') as r: print(r.status)
自定义 cookies
async def main(): cookies = {'cookies_are': 'working'} async with aiohttp.ClientSession() as session: async with session.get('http://httpbin.org/cookies', cookies=cookies) as resp: assert await resp.json() == {"cookies": {"cookies_are": "working"}}
为所有会话设置 cookies:
async def main(): cookies = {'cookies_are': 'working'} async with aiohttp.ClientSession(cookies=cookies) as session: async with session.get('http://httpbin.org/cookies') as resp: assert await resp.json() == {"cookies": {"cookies_are": "working"}}
设置代理
注意:只支持 http 代理。
async def main(): async with aiohttp.ClientSession() as session: proxy = "http://127.0.0.1:1080" async with session.get("http://python.org", proxy=proxy) as r: print(r.status)
需要用户名密码授权的代理:
async def main(): async with aiohttp.ClientSession() as session: proxy = "http://127.0.0.1:1080" proxy_auth = aiohttp.BasicAuth('username', 'password') async with session.get("http://python.org", proxy=proxy, proxy_auth=proxy_auth) as r: print(r.status)
也可以直接传递:
async def main(): async with aiohttp.ClientSession() as session: proxy = "http://username:password@127.0.0.1:1080" async with session.get("http://python.org", proxy=proxy) as r: print(r.status)
异步爬虫示例
import asyncio import aiohttp from lxml import etree from datetime import datetime headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36"} async def get_movie_url(): req_url = "https://movie.douban.com/chart" async with aiohttp.ClientSession() as session: async with session.get(url=req_url, headers=headers) as response: result = await response.text() result = etree.HTML(result) return result.xpath("//*[@id='content']/div/div[1]/div/div/table/tr/td/a/@href") async def get_movie_content(movie_url): async with aiohttp.ClientSession() as session: async with session.get(url=movie_url, headers=headers) as response: result = await response.text() result = etree.HTML(result) movie = dict() name = result.xpath('//*[@id="content"]/h1/span[1]//text()') author = result.xpath('//*[@id="info"]/span[1]/span[2]//text()') movie["name"] = name movie["author"] = author return movie def run(): start = datetime.now() loop = asyncio.get_event_loop() movie_url_list = loop.run_until_complete(get_movie_url()) tasks = [get_movie_content(url) for url in movie_url_list] movies = loop.run_until_complete(asyncio.gather(*tasks)) print(movies) print("异步用时为:{}".format(datetime.now() - start)) if __name__ == '__main__': run()
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。