Python异步编程之asyncio.create_task()用法示例解析
作者:吐个泡泡v
前言
asyncio.create_task() 是Python 3.7+引入的函数,用于创建一个Task对象。Task是对协程的封装,可以让协程在事件循环中并发执行。
1. 基本语法
task = asyncio.create_task(coro)
coro: 要包装的协程对象- 返回值: Task对象
2. 基础用法示例
2.1 简单的任务创建
import asyncio
import time
async def say_hello(name, delay):
print(f"开始执行 {name}")
await asyncio.sleep(delay)
print(f"Hello, {name}!")
return f"完成 {name}"
async def main():
# 创建任务
task1 = asyncio.create_task(say_hello("Alice", 2))
task2 = asyncio.create_task(say_hello("Bob", 1))
# 等待任务完成
result1 = await task1
result2 = await task2
print(result1)
print(result2)
# 运行主函数
asyncio.run(main())2.2 多个任务并发执行
import asyncio
async def fetch_data(url, delay):
print(f"开始获取 {url}")
await asyncio.sleep(delay)
return f"数据来自 {url}"
async def main():
# 创建多个任务
tasks = [
asyncio.create_task(fetch_data("https://api1.com", 1)),
asyncio.create_task(fetch_data("https://api2.com", 2)),
asyncio.create_task(fetch_data("https://api3.com", 1.5))
]
# 等待所有任务完成
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(main())3. 高级用法
3.1 任务的取消
import asyncio
async def long_running_task():
try:
for i in range(10):
print(f"任务执行中... {i}")
await asyncio.sleep(1)
return "任务完成"
except asyncio.CancelledError:
print("任务被取消")
raise
async def main():
task = asyncio.create_task(long_running_task())
# 3秒后取消任务
await asyncio.sleep(3)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("捕获到取消异常")
asyncio.run(main())3.2 任务状态检查
import asyncio
async def sample_task():
await asyncio.sleep(2)
return "完成"
async def main():
task = asyncio.create_task(sample_task())
print(f"任务创建后 - 完成: {task.done()}")
await asyncio.sleep(1)
print(f"等待1秒后 - 完成: {task.done()}")
await task
print(f"任务完成后 - 完成: {task.done()}")
print(f"任务结果: {task.result()}")
asyncio.run(main())3.3 异常处理
import asyncio
async def task_with_exception():
await asyncio.sleep(1)
raise ValueError("这是一个错误")
async def normal_task():
await asyncio.sleep(2)
return "正常完成"
async def main():
task1 = asyncio.create_task(task_with_exception())
task2 = asyncio.create_task(normal_task())
try:
# 使用 gather 并设置 return_exceptions=True 来处理异常
results = await asyncio.gather(task1, task2, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务 {i} 发生异常: {result}")
else:
print(f"任务 {i} 结果: {result}")
except Exception as e:
print(f"捕获异常: {e}")
asyncio.run(main())4. 实际应用场景
4.1 并发HTTP请求
示例一:
import asyncio
import aiohttp
async def fetch_url(session, url):
try:
async with session.get(url) as response:
return await response.text()
except Exception as e:
return f"错误: {e}"
async def fetch_multiple_urls():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/1'
]
async with aiohttp.ClientSession() as session:
# 创建任务列表
tasks = [asyncio.create_task(fetch_url(session, url)) for url in urls]
# 并发执行所有任务
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
print(f"URL {i+1}: {len(result) if isinstance(result, str) else result} 字符")
# 注意:需要安装 aiohttp: pip install aiohttp
# asyncio.run(fetch_multiple_urls())示例二:
import asyncio
import aiohttp
async def process_http_request():
# 使用aiohttp进行异步HTTP请求
timeout = aiohttp.ClientTimeout(total=1000)
async with aiohttp.ClientSession(timeout=timeout) as session:
headers = {}
data = {}
url = ""
is_error = False
try:
print(f"HTTP异步请求开始,data: {data} ")
async with session.post(
url=url,
headers=headers,
json=data,
timeout=timeout # 为单个请求设置超时
) as response:
if response.status == 200:
# 获取响应数据
response_data = await response.json()
if response_data['data']['status'] != 200:
print(f"HTTP异步请求处理失败: {response_data} ")
is_error = True
else:
print(f"HTTP异步请求处理失败,状态码: {response.status}")
is_error = True
except asyncio.TimeoutError:
print(f"HTTP异步请求超时!")
is_error = True
print(f"HTTP异步请求完成")
return is_error
4.2 数据库操作并发
import asyncio
# import asyncpg # 示例使用 asyncpg
async def query_database(query_id):
# 模拟数据库查询
await asyncio.sleep(1) # 模拟查询时间
return f"查询结果 {query_id}"
async def concurrent_queries():
# 创建多个查询任务
tasks = [
asyncio.create_task(query_database(i))
for i in range(5)
]
# 并发执行查询
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(concurrent_queries())create_task() vs ensure_future()
在Python 3.7之前,我们使用 asyncio.ensure_future() 来创建任务:
# 旧方式(仍然可用) task = asyncio.ensure_future(coro) # 新方式(推荐) task = asyncio.create_task(coro)
两者的主要区别:
create_task()更明确地表示创建一个任务ensure_future()功能更广泛,可以处理多种类型的awaitable对象- 在现代Python中,推荐使用
create_task()
5. 最佳实践
5.1 合理使用任务
import asyncio
async def good_practice():
# 好的做法:明确创建任务
task1 = asyncio.create_task(some_coroutine())
task2 = asyncio.create_task(another_coroutine())
# 等待结果
result1 = await task1
result2 = await task2
return result1, result2
async def avoid_this():
# 避免这样做:直接await协程不会并发执行
result1 = await some_coroutine()
result2 = await another_coroutine()
return result1, result25.2 使用超时控制
import asyncio
async def slow_operation():
await asyncio.sleep(5)
return "完成"
async def main():
try:
task = asyncio.create_task(slow_operation())
# 设置3秒超时
result = await asyncio.wait_for(task, timeout=3.0)
print(result)
except asyncio.TimeoutError:
print("操作超时")
# 取消任务
task.cancel()
asyncio.run(main())6.常见问题
在Python的asyncio中,asyncio.create_task()创建的任务不需要被await也会运行。
asyncio任务执行机制
当你使用asyncio.create_task()时,任务会被立即调度执行,而不需要显式地await它。这是因为:
asyncio.create_task()会将任务添加到事件循环中进行调度
事件循环会自动运行这些任务,不需要显式await
await的作用是等待任务完成并获取结果,而不是触发任务执行
1. 任务创建与调度
当调用asyncio.create_task()时,会发生以下事情:
(1)任务被调度:任务被添加到事件循环(event loop)中进行调度
(2)立即执行:任务会在事件循环的下一个周期开始执行
(3)并发运行:任务与当前代码以及其他任务并发运行
2. await的作用
await关键字的作用是:
(1)等待任务完成:暂停当前协程直到任务完成
(2)获取结果:获取任务的返回值
(3)异常处理:传播任务中发生的异常
但是await不是触发任务执行的必要条件。
3. 实际示例
import asyncio
async def background_task(name):
print(f"任务 {name} 开始")
await asyncio.sleep(2)
print(f"任务 {name} 完成")
return f"结果 {name}"
async def main():
# 创建任务但不await - 任务会自动运行
task1 = asyncio.create_task(background_task("1"))
task2 = asyncio.create_task(background_task("2"))
print("任务已创建")
# 做一些其他工作
await asyncio.sleep(1)
print("主协程继续执行")
# 等待任务完成获取结果
result1 = await task1
result2 = await task2
print(f"结果: {result1}, {result2}")
# 运行主函数
asyncio.run(main())
从输出结果可以看出,即使没有立即await任务,它们也已经开始运行了。
7. 总结
asyncio.create_task() 是Python异步编程中的核心函数,学会使用能够:
- 并发执行协程:通过创建Task对象实现真正的并发
- 灵活控制任务:可以取消、检查状态、处理异常
- 提高程序性能:特别适合I/O密集型操作
- 简化异步代码:让异步编程更加直观和易用
到此这篇关于Python异步编程之asyncio.create_task()用法的文章就介绍到这了,更多相关Python asyncio.create_task()用法内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
