Python Celery 任务队列
大约 10 分钟约 3046 字
Python Celery 任务队列
简介
Celery 是 Python 生态中最成熟的分布式任务队列框架,通过消息代理(Broker)将耗时任务从主流程中解耦,交由独立的 Worker 进程异步执行。适用于邮件发送、报表生成、数据处理、定时任务等需要异步化和可扩展性的场景。
特点
实现
基础配置与任务定义
# celery_config.py
from celery import Celery
from kombu import Queue
app = Celery("myproject")
# 基础配置
app.conf.update(
broker_url="redis://localhost:6379/0",
result_backend="redis://localhost:6379/1",
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="Asia/Shanghai",
enable_utc=True,
# 任务路由
task_routes={
"tasks.email.*": {"queue": "email"},
"tasks.report.*": {"queue": "report"},
"tasks.heavy.*": {"queue": "heavy"},
},
# 队列定义
task_queues=(
Queue("default"),
Queue("email"),
Queue("report"),
Queue("heavy"),
),
# 默认队列
task_default_queue="default",
# 并发控制
worker_concurrency=4,
worker_prefetch_multiplier=2,
# 任务超时
task_soft_time_limit=300, # 软超时 5 分钟
task_time_limit=600, # 硬超时 10 分钟
# 重试策略
task_acks_late=True,
task_reject_on_worker_lost=True,
)
# 任务定义
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_email(self, to: str, subject: str, body: str):
"""发送邮件任务(带重试)"""
try:
import smtplib
from email.mime.text import MIMEText
msg = MIMEText(body, "html", "utf-8")
msg["Subject"] = subject
msg["To"] = to
with smtplib.SMTP("smtp.example.com", 587) as server:
server.starttls()
server.login("user@example.com", "password")
server.send_message(msg)
return {"status": "sent", "to": to}
except Exception as exc:
raise self.retry(exc=exc)
@app.task(bind=True, name="tasks.process_order")
def process_order(self, order_id: str):
"""处理订单任务(幂等设计)"""
import time
time.sleep(2) # 模拟耗时操作
return {"order_id": order_id, "status": "processed"}
@app.task(name="tasks.cleanup_expired")
def cleanup_expired():
"""清理过期数据"""
import datetime
cutoff = datetime.datetime.now() - datetime.timedelta(days=30)
return {"cutoff": cutoff.isoformat(), "deleted": 42}任务编排:Chain、Group、Chord
from celery import chain, group, chord
# 1. Chain:串行执行,前一个任务的结果传给下一个
workflow = chain(
validate_order.s({"order_id": "ORD-001", "amount": 99.9}),
deduct_inventory.s(),
process_payment.s(),
send_confirmation.s(),
)
result = workflow.apply_async()
@app.task
def validate_order(order_data: dict) -> dict:
order_data["validated"] = True
return order_data
@app.task
def deduct_inventory(order_data: dict) -> dict:
order_data["inventory_deducted"] = True
return order_data
@app.task
def process_payment(order_data: dict) -> dict:
order_data["payment_processed"] = True
return order_data
@app.task
def send_confirmation(order_data: dict) -> dict:
print(f"发送确认邮件: {order_data}")
order_data["confirmation_sent"] = True
return order_data
# 2. Group:并行执行多个任务
batch_results = group(
send_email.s("user1@test.com", "主题", "内容"),
send_email.s("user2@test.com", "主题", "内容"),
send_email.s("user3@test.com", "主题", "内容"),
).apply_async()
# 3. Chord:并行执行 + 汇总
# 先并行处理所有订单,全部完成后执行汇总
chord(
[process_order.s(f"ORD-{i:03d}") for i in range(10)],
generate_report.s()
).apply_async()
@app.task
def generate_report(results: list) -> dict:
return {
"total": len(results),
"processed": sum(1 for r in results if r.get("status") == "processed"),
}定时任务与 Celery Beat
from celery.schedules import crontab
# celery_config.py 中添加 Beat 调度配置
app.conf.beat_schedule = {
# 每 10 分钟检查待处理订单
"check-pending-orders": {
"task": "tasks.process_pending_orders",
"schedule": 600, # 秒
},
# 每天凌晨 2 点生成日报
"daily-report": {
"task": "tasks.generate_daily_report",
"schedule": crontab(hour=2, minute=0),
},
# 每周一上午 9 点清理过期数据
"weekly-cleanup": {
"task": "tasks.cleanup_expired",
"schedule": crontab(hour=9, minute=0, day_of_week=1),
},
# 每月 1 号发送月度统计
"monthly-stats": {
"task": "tasks.send_monthly_stats",
"schedule": crontab(hour=8, minute=0, day_of_month=1),
},
}
@app.task(name="tasks.process_pending_orders")
def process_pending_orders():
"""处理待处理订单"""
import time
time.sleep(1)
return {"processed": 5, "failed": 0}
@app.task(name="tasks.generate_daily_report")
def generate_daily_report():
"""生成日报"""
return {"report_date": "2024-01-15", "total_orders": 150}
@app.task(name="tasks.send_monthly_stats")
def send_monthly_stats():
"""发送月度统计"""
return {"month": "2024-01", "stats": "sent"}
# 启动 Beat:
# celery -A celery_config beat --loglevel=info
# 启动 Worker:
# celery -A celery_config worker --loglevel=info -Q default,email任务监控与错误处理
from celery import signals
import logging
import time
logger = logging.getLogger(__name__)
# --- 任务钩子 ---
@signals.task_prerun.connect
def task_prerun_handler(sender=None, **kwargs):
"""任务开始前记录"""
logger.info(f"任务开始: {sender.name}[{kwargs.get('task_id')}]")
@signals.task_postrun.connect
def task_postrun_handler(sender=None, **kwargs):
"""任务完成后记录"""
logger.info(f"任务完成: {sender.name}[{kwargs.get('task_id')}], 状态: {kwargs.get('state')}")
@signals.task_failure.connect
def task_failure_handler(sender=None, **kwargs):
"""任务失败时告警"""
exception = kwargs.get("exception")
logger.error(f"任务失败: {sender.name}, 异常: {exception}")
# 发送告警通知
# send_alert(f"Celery 任务失败: {sender.name}", str(exception))
# --- 任务状态追踪 ---
@app.task(bind=True)
def long_running_task(self, data: dict):
"""长时间运行的任务,带进度上报"""
total = len(data.get("items", []))
for i, item in enumerate(data["items"]):
# 模拟处理
time.sleep(0.5)
# 更新进度
progress = int((i + 1) / total * 100)
self.update_state(
state="PROGRESS",
meta={"current": i + 1, "total": total, "percent": progress},
)
return {"total": total, "status": "completed"}
# 查询任务状态
def check_task_status(task_id: str) -> dict:
result = app.AsyncResult(task_id)
if result.ready():
return {"status": result.status, "result": result.result}
elif result.state == "PROGRESS":
return {"status": "in_progress", "meta": result.info}
else:
return {"status": result.state}
# --- 幂等性保障 ---
@app.task(bind=True, name="tasks.idempotent_process")
def idempotent_process(self, resource_id: str):
"""幂等任务:使用 Redis 防止重复执行"""
import redis
r = redis.Redis()
lock_key = f"task_lock:{self.name}:{resource_id}"
if r.get(lock_key):
return {"status": "skipped", "reason": "already_processed"}
try:
r.set(lock_key, "1", ex=3600) # 1 小时锁
# 执行业务逻辑
result = do_heavy_work(resource_id)
return {"status": "success", "result": result}
except Exception as e:
r.delete(lock_key) # 失败时释放锁
raise
def do_heavy_work(resource_id: str) -> dict:
time.sleep(1)
return {"resource_id": resource_id, "processed": True}Celery 与 FastAPI 集成
# tasks.py - 任务定义(独立模块,被 Worker 加载)
from celery import Celery
from celery.result import AsyncResult
import time
app = Celery("worker", broker="redis://localhost:6379/0", backend="redis://localhost:6379/1")
@app.task(bind=True, max_retries=3, default_retry_delay=30)
def send_email_task(self, to: str, subject: str, body: str):
"""发送邮件任务"""
try:
time.sleep(2) # 模拟发送
return {"status": "sent", "to": to}
except Exception as exc:
raise self.retry(exc=exc)
@app.task(bind=True)
def generate_report_task(self, report_type: str, params: dict):
"""生成报表任务"""
total_steps = params.get("steps", 10)
for i in range(total_steps):
time.sleep(0.5)
self.update_state(
state="PROGRESS",
meta={"current": i + 1, "total": total_steps, "percent": int((i + 1) / total_steps * 100)},
)
return {"report_type": report_type, "status": "completed", "rows": total_steps}# main.py - FastAPI 应用
from fastapi import FastAPI, HTTPException, BackgroundTasks
from celery.result import AsyncResult
from tasks import app as celery_app, send_email_task, generate_report_task
from pydantic import BaseModel
from typing import Optional
api = FastAPI(title="Task API")
class EmailRequest(BaseModel):
to: str
subject: str
body: str
class ReportRequest(BaseModel):
report_type: str
steps: int = 10
@api.post("/api/tasks/email")
async def create_email_task(request: EmailRequest):
"""提交邮件发送任务"""
task = send_email_task.delay(request.to, request.subject, request.body)
return {"task_id": task.id, "status": "pending"}
@api.post("/api/tasks/report")
async def create_report_task(request: ReportRequest):
"""提交报表生成任务"""
task = generate_report_task.delay(request.report_type, {"steps": request.steps})
return {"task_id": task.id, "status": "pending"}
@api.get("/api/tasks/{task_id}")
async def get_task_status(task_id: str):
"""查询任务状态"""
result = AsyncResult(task_id, app=celery_app)
response = {
"task_id": task_id,
"status": result.status,
}
if result.ready():
response["result"] = result.result
elif result.state == "PROGRESS":
response["progress"] = result.info
return response
@api.delete("/api/tasks/{task_id}")
async def revoke_task(task_id: str):
"""取消任务"""
result = AsyncResult(task_id, app=celery_app)
result.revoke(terminate=True, signal="SIGTERM")
return {"task_id": task_id, "status": "revoked"}Celery 与 Django 集成
# proj/celery.py - Django + Celery 配置
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "proj.settings")
app = Celery("proj")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
# proj/settings.py 中添加
# CELERY_BROKER_URL = "redis://localhost:6379/0"
# CELERY_RESULT_BACKEND = "redis://localhost:6379/1"
# CELERY_ACCEPT_CONTENT = ["json"]
# CELERY_TASK_SERIALIZER = "json"
# CELERY_RESULT_SERIALIZER = "json"
# CELERY_TIMEZONE = "Asia/Shanghai"
# myapp/tasks.py - Django 应用中的任务
# from celery import shared_task
#
# @shared_task(bind=True, max_retries=3)
# def process_order_task(self, order_id: int):
# from myapp.models import Order
# order = Order.objects.get(id=order_id)
# # 处理订单逻辑
# return {"order_id": order_id, "status": "processed"}
#
# # 在视图中调用
# process_order_task.delay(order_id=123)任务结果存储与查询
from celery import Celery
from celery.backends.redis import RedisBackend
import json
import time
app = Celery("tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/1")
# 配置结果过期时间
app.conf.update(
result_expires=3600, # 结果保留 1 小时
result_extended=True, # 存储更多元数据
)
@app.task
def add(x: int, y: int) -> int:
return x + y
@app.task(bind=True)
def chain_task(self, step: int, data: dict):
"""链式任务示例"""
data[f"step_{step}"] = f"completed at {time.time()}"
if step < 3:
next_step = chain_task.signature(args=(step + 1, data))
raise self.replace(next_step)
return data
# 批量查询任务结果
def batch_check_results(task_ids: list[str]) -> list[dict]:
"""批量检查多个任务的状态"""
from celery.result import AsyncResult
results = []
for tid in task_ids:
result = AsyncResult(tid, app=app)
entry = {"task_id": tid, "status": result.status}
if result.ready():
if result.successful():
entry["result"] = result.result
else:
entry["error"] = str(result.result)
results.append(entry)
return resultsCelery 监控与 Flower
# 安装 Flower 监控面板
pip install flower
# 启动 Flower
celery -A celery_config flower --port=5555
# 访问 http://localhost:5555 查看监控面板
# 常用命令
celery -A celery_config inspect active # 查看正在执行的任务
celery -A celery_config inspect reserved # 查看预取的任务
celery -A celery_config inspect scheduled # 查看定时任务
celery -A celery_config inspect stats # 查看 Worker 统计信息
celery -A celery_config inspect registered # 查看已注册的任务
celery -A celery_config purge # 清空所有待执行任务
celery -A celery_config control rate_limit tasks.email.send 10/m # 限速Celery 性能优化与最佳实践
# 1. Worker 配置优化
# celery -A celery_config worker \
# --concurrency=8 \ # 进程数(默认等于 CPU 核数)
# --prefetch-multiplier=1 \ # 预取任务数(减少时降低延迟)
# --max-tasks-per-child=1000 \ # 每个子进程处理 N 个任务后重启(防止内存泄漏)
# --time-limit=300 \ # 硬超时
# --soft-time-limit=240 \ # 软超时(触发 SoftTimeLimitExceeded)
# -Q default,email # 监听的队列
# 2. 任务设计最佳实践
@app.task(bind=True, max_retries=3, default_retry_delay=60,
acks_late=True, reject_on_worker_lost=True)
def robust_task(self, data: dict):
"""健壮的任务设计"""
import redis
r = redis.Redis()
# 幂等性检查
task_key = f"task_result:{self.request.id}"
if r.exists(task_key):
return json.loads(r.get(task_key))
try:
result = process_data(data)
# 缓存结果(防止重复执行)
r.setex(task_key, 3600, json.dumps(result))
return result
except (ConnectionError, TimeoutError) as exc:
raise self.retry(exc=exc, countdown=60)
except ValueError as exc:
# 业务异常不重试
logger.error(f"业务异常: {exc}")
raise
# 3. 避免在任务中传递大对象
# 差:传递整个 ORM 对象
# @app.task
# def process_user(user): # ORM 对象无法序列化
# pass
# 好:传递 ID,在任务内部查询
@app.task
def process_user_by_id(user_id: int):
from myapp.models import User
user = User.objects.get(id=user_id)
# 处理逻辑
# 4. 使用任务签名实现动态工作流
from celery import chord
# 动态 chord:批量处理,完成后汇总
@app.task
def process_batch_item(item_id: str):
return {"item_id": item_id, "result": "processed"}
@app.task
def aggregate_results(results: list):
return {"total": len(results), "success": sum(1 for r in results if r.get("result") == "processed")}
def submit_dynamic_batch(item_ids: list[str]):
"""动态提交批处理任务"""
callback = aggregate_results.s()
chord(process_batch_item.s(iid) for iid in item_ids)(callback)优点
缺点
总结
Celery 是 Python 异步任务的事实标准,通过 Broker/Worker 架构实现任务的异步化、分布式执行。核心是保证任务幂等、合理设计重试策略、监控任务积压和失败率。对于不需要分布式能力的简单场景,可考虑使用 asyncio 或轻量级方案。
关键知识点
- Celery 架构:Producer(生产者)-> Broker(消息代理)-> Worker(消费者)-> Backend(结果存储)
- 任务幂等:同一任务多次执行结果一致,通过唯一键或 Redis 锁保证
- task_routes 实现任务路由,不同任务分配到不同队列和 Worker
- Celery Beat 负责定时任务调度,确保 Beat 进程高可用
项目落地视角
- 任务函数保持纯函数风格,只接收基本类型参数,不依赖外部上下文
- 为每个任务队列配置独立的 Worker,避免慢任务影响快任务
- 监控队列积压、任务失败率和 Worker 健康状态,设置告警阈值
- 生产环境使用 RabbitMQ 作为 Broker,比 Redis 更可靠
常见误区
- 任务函数传递 ORM 对象或复杂对象,序列化失败或上下文丢失
- 没有设计幂等性,任务重试导致重复发送邮件或重复扣款
- Worker 中使用同步阻塞代码,浪费并发能力
- Beat 单点部署无高可用方案,宕机导致定时任务不执行
进阶路线
- 学习 Celery 与 Kubernetes 的集成部署方案
- 研究任务工作流引擎(Prefect、Dagster)与 Celery 的对比
- 了解 Celery 事件系统和 Flower 监控面板
- 探索基于 Redis Stream 的新型任务队列方案
适用场景
- 邮件发送、短信通知、报表生成等耗时异步任务
- 批量数据处理、文件转换、图像处理等计算密集型任务
- 定时任务和周期性数据同步
落地建议
- 任务函数接收 ID 而非对象,在 Worker 内部重新查询数据
- 为每个任务配置合理的超时、重试次数和重试间隔
- 使用 Flower 或自建面板监控任务执行情况
排错清单
- 检查 Broker 连接是否正常,队列是否有积压
- 确认 Worker 进程是否存活,日志中是否有异常
- 排查任务序列化失败,确认参数类型是否支持 JSON 序列化
复盘问题
- 你的任务失败率是多少?是否有自动重试和告警机制?
- 任务积压时的处理策略是什么?是否有降级方案?
- 任务幂等性是如何保证的?重复消费的后果是什么?
