python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python asyncio.create_task()用法

Python异步编程之asyncio.create_task()用法示例解析

作者:吐个泡泡v

asyncio.create_task()是Python中用于将协程包装为任务并加入事件循环的核心方法,这篇文章主要介绍了Python异步编程之asyncio.create_task()用法的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下

前言

asyncio.create_task() 是Python 3.7+引入的函数,用于创建一个Task对象。Task是对协程的封装,可以让协程在事件循环中并发执行。

1. 基本语法

task = asyncio.create_task(coro)

2. 基础用法示例

2.1 简单的任务创建

import asyncio
import time

async def say_hello(name, delay):
    print(f"开始执行 {name}")
    await asyncio.sleep(delay)
    print(f"Hello, {name}!")
    return f"完成 {name}"

async def main():
    # 创建任务
    task1 = asyncio.create_task(say_hello("Alice", 2))
    task2 = asyncio.create_task(say_hello("Bob", 1))
    
    # 等待任务完成
    result1 = await task1
    result2 = await task2
    
    print(result1)
    print(result2)

# 运行主函数
asyncio.run(main())

2.2 多个任务并发执行

import asyncio

async def fetch_data(url, delay):
    print(f"开始获取 {url}")
    await asyncio.sleep(delay)
    return f"数据来自 {url}"

async def main():
    # 创建多个任务
    tasks = [
        asyncio.create_task(fetch_data("https://api1.com", 1)),
        asyncio.create_task(fetch_data("https://api2.com", 2)),
        asyncio.create_task(fetch_data("https://api3.com", 1.5))
    ]
    
    # 等待所有任务完成
    results = await asyncio.gather(*tasks)
    
    for result in results:
        print(result)

asyncio.run(main())

3. 高级用法

3.1 任务的取消

import asyncio

async def long_running_task():
    try:
        for i in range(10):
            print(f"任务执行中... {i}")
            await asyncio.sleep(1)
        return "任务完成"
    except asyncio.CancelledError:
        print("任务被取消")
        raise

async def main():
    task = asyncio.create_task(long_running_task())
    
    # 3秒后取消任务
    await asyncio.sleep(3)
    task.cancel()
    
    try:
        await task
    except asyncio.CancelledError:
        print("捕获到取消异常")

asyncio.run(main())

3.2 任务状态检查

import asyncio

async def sample_task():
    await asyncio.sleep(2)
    return "完成"

async def main():
    task = asyncio.create_task(sample_task())
    
    print(f"任务创建后 - 完成: {task.done()}")
    
    await asyncio.sleep(1)
    print(f"等待1秒后 - 完成: {task.done()}")
    
    await task
    print(f"任务完成后 - 完成: {task.done()}")
    print(f"任务结果: {task.result()}")

asyncio.run(main())

3.3 异常处理

import asyncio

async def task_with_exception():
    await asyncio.sleep(1)
    raise ValueError("这是一个错误")

async def normal_task():
    await asyncio.sleep(2)
    return "正常完成"

async def main():
    task1 = asyncio.create_task(task_with_exception())
    task2 = asyncio.create_task(normal_task())
    
    try:
        # 使用 gather 并设置 return_exceptions=True 来处理异常
        results = await asyncio.gather(task1, task2, return_exceptions=True)
        
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"任务 {i} 发生异常: {result}")
            else:
                print(f"任务 {i} 结果: {result}")
    except Exception as e:
        print(f"捕获异常: {e}")

asyncio.run(main())

4. 实际应用场景

4.1 并发HTTP请求

示例一:

import asyncio
import aiohttp

async def fetch_url(session, url):
    try:
        async with session.get(url) as response:
            return await response.text()
    except Exception as e:
        return f"错误: {e}"

async def fetch_multiple_urls():
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/delay/1'
    ]
    
    async with aiohttp.ClientSession() as session:
        # 创建任务列表
        tasks = [asyncio.create_task(fetch_url(session, url)) for url in urls]
        
        # 并发执行所有任务
        results = await asyncio.gather(*tasks)
        
        for i, result in enumerate(results):
            print(f"URL {i+1}: {len(result) if isinstance(result, str) else result} 字符")

# 注意:需要安装 aiohttp: pip install aiohttp
# asyncio.run(fetch_multiple_urls())

示例二:

import asyncio
import aiohttp


async def process_http_request():
    # 使用aiohttp进行异步HTTP请求
    timeout = aiohttp.ClientTimeout(total=1000)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        headers = {}
        data = {}
        url = ""
        is_error = False
        try:
            print(f"HTTP异步请求开始,data: {data} ")
            async with session.post(
                    url=url,
                    headers=headers,
                    json=data,
                    timeout=timeout  # 为单个请求设置超时
            ) as response:
                if response.status == 200:
                    # 获取响应数据
                    response_data = await response.json()
                    if response_data['data']['status'] != 200:
                        print(f"HTTP异步请求处理失败: {response_data} ")
                        is_error = True
                else:
                    print(f"HTTP异步请求处理失败,状态码: {response.status}")
                    is_error = True
        except asyncio.TimeoutError:
            print(f"HTTP异步请求超时!")
            is_error = True

    print(f"HTTP异步请求完成")
    return is_error

4.2 数据库操作并发

import asyncio
# import asyncpg  # 示例使用 asyncpg

async def query_database(query_id):
    # 模拟数据库查询
    await asyncio.sleep(1)  # 模拟查询时间
    return f"查询结果 {query_id}"

async def concurrent_queries():
    # 创建多个查询任务
    tasks = [
        asyncio.create_task(query_database(i)) 
        for i in range(5)
    ]
    
    # 并发执行查询
    results = await asyncio.gather(*tasks)
    
    for result in results:
        print(result)

asyncio.run(concurrent_queries())

create_task() vs ensure_future()

在Python 3.7之前,我们使用 asyncio.ensure_future() 来创建任务:

# 旧方式(仍然可用)
task = asyncio.ensure_future(coro)

# 新方式(推荐)
task = asyncio.create_task(coro)

两者的主要区别:

5. 最佳实践

5.1 合理使用任务

import asyncio

async def good_practice():
    # 好的做法:明确创建任务
    task1 = asyncio.create_task(some_coroutine())
    task2 = asyncio.create_task(another_coroutine())
    
    # 等待结果
    result1 = await task1
    result2 = await task2
    
    return result1, result2

async def avoid_this():
    # 避免这样做:直接await协程不会并发执行
    result1 = await some_coroutine()
    result2 = await another_coroutine()
    return result1, result2

5.2 使用超时控制

import asyncio

async def slow_operation():
    await asyncio.sleep(5)
    return "完成"

async def main():
    try:
        task = asyncio.create_task(slow_operation())
        # 设置3秒超时
        result = await asyncio.wait_for(task, timeout=3.0)
        print(result)
    except asyncio.TimeoutError:
        print("操作超时")
        # 取消任务
        task.cancel()

asyncio.run(main())

6.常见问题

在Python的asyncio中,asyncio.create_task()创建的任务不需要被await也会运行。

asyncio任务执行机制

当你使用asyncio.create_task()时,任务会被立即调度执行,而不需要显式地await它。这是因为:

asyncio.create_task()会将任务添加到事件循环中进行调度

事件循环会自动运行这些任务,不需要显式await

await的作用是等待任务完成并获取结果,而不是触发任务执行

1. 任务创建与调度

当调用asyncio.create_task()时,会发生以下事情:

(1)任务被调度:任务被添加到事件循环(event loop)中进行调度

(2)立即执行:任务会在事件循环的下一个周期开始执行

(3)并发运行:任务与当前代码以及其他任务并发运行

2. await的作用

await关键字的作用是:

(1)等待任务完成:暂停当前协程直到任务完成

(2)获取结果:获取任务的返回值

(3)异常处理:传播任务中发生的异常

但是await不是触发任务执行的必要条件。

3. 实际示例

import asyncio

async def background_task(name):
    print(f"任务 {name} 开始")
    await asyncio.sleep(2)
    print(f"任务 {name} 完成")
    return f"结果 {name}"

async def main():
    # 创建任务但不await - 任务会自动运行
    task1 = asyncio.create_task(background_task("1"))
    task2 = asyncio.create_task(background_task("2"))
    
    print("任务已创建")
    
    # 做一些其他工作
    await asyncio.sleep(1)
    print("主协程继续执行")
    
    # 等待任务完成获取结果
    result1 = await task1
    result2 = await task2
    
    print(f"结果: {result1}, {result2}")

# 运行主函数
asyncio.run(main())

从输出结果可以看出,即使没有立即await任务,它们也已经开始运行了。

7. 总结

asyncio.create_task() 是Python异步编程中的核心函数,学会使用能够:

  1. 并发执行协程:通过创建Task对象实现真正的并发
  2. 灵活控制任务:可以取消、检查状态、处理异常
  3. 提高程序性能:特别适合I/O密集型操作
  4. 简化异步代码:让异步编程更加直观和易用

到此这篇关于Python异步编程之asyncio.create_task()用法的文章就介绍到这了,更多相关Python asyncio.create_task()用法内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文