python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python Celery使用

Python中Celery分布式任务队列的介绍与使用指南

作者:檀越@新空间

Celery 是一个开源的、分布式的任务队列系统,使用 Python 编写,它的核心目的是处理异步任务和定时任务,下面小编就为大家详细介绍一下Celery的具体使用吧

第一部分:Celery 是什么

Celery 是一个开源的、分布式的任务队列系统,使用 Python 编写。它的核心目的是处理异步任务定时任务

核心概念类比:餐厅

想象一个繁忙的餐厅:

服务员将订单放到传菜口后,就可以立刻回去服务下一个顾客,而不必在厨房等待披萨做完。厨房的工作人员会从传菜口按顺序取出订单并制作披萨。这就是 Celery 的核心理念:将耗时的操作放到后台执行,让主程序(如 Web 服务器)能够快速响应客户端

第二部分:为什么需要使用 Celery?

在 Web 开发中,有些操作执行起来很慢,如果让用户在网页上一直等待直到操作完成,体验会非常差,甚至导致请求超时。

典型使用场景:

1.耗时任务

2.定时任务

3.大规模处理

同时处理成千上万个 API 调用或数据清洗任务。

第三部分:Celery 的核心组件

一个完整的 Celery 系统通常包含四个部分:

Celery Client (客户端)

Message Broker (消息代理)

Celery Worker (工作者)

Result Backend (结果后端) (可选):

数据流总结:Client -(发送任务)-> Broker -(分配任务)-> Worker -(存储结果)-> Result Backend

第四部分:快速入门与实践

我们将使用 Redis 作为消息代理和结果后端,因为它安装简单且功能强大。

步骤 1:安装必要的库

pip install celery redis

确保你已安装并启动了 Redis 服务。

步骤 2:创建 Celery 应用和任务 (tasks.py)

# tasks.py
from celery import Celery

# 创建 Celery 实例,'my_app' 是当前项目的名称
app = Celery('my_app',
             broker='redis://localhost:6379/0',      # 使用 Redis 作为 Broker
             backend='redis://localhost:6379/0')     # 使用 Redis 作为 Backend

# 定义一个任务
@app.task
def add(x, y):
    print(f"正在计算 {x} + {y}...")
    # 模拟一个耗时操作
    import time
    time.sleep(5)
    result = x + y
    print(f"计算完成!结果是 {result}")
    return result

@app.task
def send_email(to, subject, body):
    # 模拟发送邮件的逻辑
    print(f"正在向 {to} 发送邮件,主题:{subject}")
    # ... 实际的发信代码 ...
    return f"邮件已发送至 {to}"

步骤 3:启动 Worker

在终端中,进入 tasks.py 所在的目录,运行以下命令启动 Worker:

celery -A tasks worker --loglevel=info

如果成功,你会看到类似下面的输出,表示 Worker 已启动并在等待任务:

 -------------- celery@YourComputer v5.3.0 (emerald-rush)
--- ***** -----
-- ******* ----- [Configuration]
- *** --- * --- . broker:      redis://localhost:6379/0
- ** ---------- . result:      redis://localhost:6379/0
- ** ---------- . app:         my_app:0x...
- ** ---------- . concurrency: 8 (prefork)
- ** ---------- . tasks:       ['tasks.add', 'tasks.send_email']
--- ***** ----- [Queues]
 -------------- . celery           exchange=celery(direct) key=celery

[tasks]
  . tasks.add
  . tasks.send_email

[2024-...] [INFO] Task tasks.add[1234-...] received

步骤 4:调用任务

现在,打开一个 Python Shell(在另一个终端中),来调用我们定义的任务。

# 在 Python Shell 中
from tasks import add, send_email

# 1. 异步调用 - 最常用的方式
# 这行代码会立即返回一个 AsyncResult 对象,而不会阻塞程序
result = add.delay(4, 6) # .delay() 是快捷的异步调用方法
print(f"任务ID:{result.id}") # 立即打印任务ID,此时任务正在后台运行

# 2. 获取任务结果(可选)
# 因为我们的任务有 5 秒休眠,所以需要等待一下再获取结果
print("正在等待结果...")
value = result.get(timeout=10) # .get() 会阻塞,直到任务完成并返回结果
print(f"任务结果是:{value}")

# 3. 检查任务状态(可选)
print(f"任务状态:{result.status}") # 可能是 PENDING, SUCCESS, FAILURE 等

# 4. 调用发送邮件的任务
email_result = send_email.delay('user@example.com', '欢迎光临', '感谢您注册!')
print(f"邮件任务ID:{email_result.id}")

步骤 5:监控任务(可选)

Celery 提供了一个强大的命令行工具来监控 Worker 的状态和任务队列。你还可以使用更强大的工具如 Flower

# 安装 Flower
pip install flower

# 启动 Flower (在项目目录下)
celery -A tasks flower

# 然后在浏览器中访问 http://localhost:5555

第五部分:在 Web 框架(以 Flask 为例)中使用

将 Celery 集成到 Web 框架中是其最常见的用法。

# app.py
from flask import Flask, jsonify
from celery import Celery

# 配置 Flask
app = Flask(__name__)

# 配置 Celery(通常从配置文件读取)
def make_celery(app):
    celery = Celery(
        app.import_name,
        broker='redis://localhost:6379/0',
        backend='redis://localhost:6379/0'
    )
    # 可选:更新Celery配置,使其与Flask配置保持一致
    celery.conf.update(app.config)
    return celery

celery = make_celery(app)

@celery.task
def background_task():
    # 一个模拟的耗时任务
    import time
    time.sleep(10)
    return "后台任务完成!"

@app.route('/start_task', methods=['POST'])
def start_task():
    # 在视图函数中触发后台任务
    task = background_task.delay()
    return jsonify({'task_id': task.id}), 202 # 202 Accepted 表示请求已被接受处理

@app.route('/check_status/<task_id>')
def check_status(task_id):
    task = background_task.AsyncResult(task_id)
    if task.state == 'PENDING':
        response = {'state': task.state, 'status': '任务正在排队或执行中...'}
    elif task.state == 'SUCCESS':
        response = {'state': task.state, 'result': task.result}
    else: # FAILURE 等其他状态
        response = {'state': task.state, 'status': str(task.info)}
    return jsonify(response)

if __name__ == '__main__':
    app.run(debug=True)

总结

特性描述
本质分布式任务队列
核心价值异步处理、解耦、提高响应速度
关键组件Client, Broker, Worker, Result Backend
工作流程应用发布任务 -> Broker 传递 -> Worker 执行
使用场景邮件发送、文件处理、定时任务、复杂计算

Celery 是 Python 生态中处理后台任务的标杆工具,熟练掌握它能极大地提升你构建复杂、高性能应用的能力。

到此这篇关于Python中Celery分布式任务队列的介绍与使用指南的文章就介绍到这了,更多相关Python Celery使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文