Python 异步编程模式与陷阱
大约 17 分钟约 5030 字
Python 异步编程模式与陷阱
简介
Python 的异步编程(asyncio)自 Python 3.5 引入 async/await 语法以来,已经成为构建高性能 I/O 密集型应用的核心技术。然而,异步编程的思维模型与同步编程截然不同,开发者容易陷入各种陷阱和反模式。本文将系统性地介绍 Python 异步编程的核心模式、常见陷阱和最佳实践,帮助开发者正确、高效地使用异步技术。
特点
Python 异步编程的核心特征:
- 协作式并发: 基于事件循环的协作式多任务,而非抢占式
- 单线程模型: 在单线程内通过协程切换实现并发
- I/O 友好: 天然适合网络请求、数据库查询等 I/O 密集型场景
- 语法简洁: async/await 语法使异步代码可读性接近同步代码
- 生态完善: 拥有 aiohttp、asyncpg、httpx 等成熟的异步库
实现
1. 异步基础与核心概念
1.1 协程基础
import asyncio
import time
from typing import Any
# ---- 协程的定义与调用 ----
async def simple_coroutine():
"""最简单的协程"""
print("协程开始")
await asyncio.sleep(1) # 非阻塞等待
print("协程结束")
return "完成"
# 运行协程的三种方式
# 方式1: asyncio.run() (推荐,Python 3.7+)
result = asyncio.run(simple_coroutine())
# 方式2: 在已有事件循环中
async def main():
result = await simple_coroutine()
print(result)
# 方式3: 创建任务并发执行
async def concurrent_execution():
task1 = asyncio.create_task(simple_coroutine(), name="task1")
task2 = asyncio.create_task(simple_coroutine(), name="task2")
# 等待所有任务完成
results = await asyncio.gather(task1, task2)
return results
# ---- Task 与 Future 的区别 ----
async def task_vs_future():
"""Task 和 Future 的区别"""
# Task 是 Future 的子类,自动调度协程执行
task = asyncio.create_task(simple_coroutine())
# Task 可以获取名称和状态
print(f"任务名称: {task.get_name()}")
print(f"任务完成: {task.done()}")
print(f"任务取消: {task.cancelled()}")
result = await task
print(f"任务结果: {task.result()}")
# ---- 超时控制 ----
async def with_timeout():
"""超时控制"""
try:
# asyncio.wait_for 设置超时
result = await asyncio.wait_for(
asyncio.sleep(10),
timeout=3.0
)
except asyncio.TimeoutError:
print("操作超时")
# Python 3.11+ 的 asyncio.timeout()
async with asyncio.timeout(3.0):
await asyncio.sleep(10)1.2 并发模式对比
import time
async def fetch_data(url: str, delay: float) -> str:
"""模拟网络请求"""
await asyncio.sleep(delay)
return f"数据来自 {url}"
async def sequential_execution():
"""顺序执行 - 总耗时 = sum(所有延迟)"""
start = time.time()
result1 = await fetch_data("api1", 2)
result2 = await fetch_data("api2", 3)
result3 = await fetch_data("api3", 1)
elapsed = time.time() - start
print(f"顺序执行: {elapsed:.1f}s") # 约 6s
async def concurrent_gather():
"""并发执行 - gather - 总耗时 = max(所有延迟)"""
start = time.time()
results = await asyncio.gather(
fetch_data("api1", 2),
fetch_data("api2", 3),
fetch_data("api3", 1),
)
elapsed = time.time() - start
print(f"并发 gather: {elapsed:.1f}s") # 约 3s
return results
async def concurrent_tasks():
"""使用 create_task 并发"""
start = time.time()
task1 = asyncio.create_task(fetch_data("api1", 2))
task2 = asyncio.create_task(fetch_data("api2", 3))
task3 = asyncio.create_task(fetch_data("api3", 1))
# 可以在等待期间做其他事情
print("任务已启动,可以做其他工作...")
result1 = await task1
result2 = await task2
result3 = await task3
elapsed = time.time() - start
print(f"并发 tasks: {elapsed:.1f}s")
async def task_groups():
"""Python 3.11+ TaskGroup"""
start = time.time()
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_data("api1", 2))
task2 = tg.create_task(fetch_data("api2", 3))
task3 = tg.create_task(fetch_data("api3", 1))
# TaskGroup 退出时所有任务已完成,任一任务异常会取消其他任务
elapsed = time.time() - start
print(f"TaskGroup: {elapsed:.1f}s")
print(f"结果: {task1.result()}, {task2.result()}, {task3.result()}")2. 常见反模式与陷阱
2.1 阻塞事件循环
import asyncio
import time
import httpx
# ---- 反模式: 在异步函数中调用阻塞操作 ----
async def anti_pattern_blocking():
"""错误示例: 阻塞事件循环"""
# 这会阻塞整个事件循环!
time.sleep(5) # 错误! 应该用 asyncio.sleep(5)
# 同步 HTTP 请求也会阻塞
# import requests
# requests.get("https://api.example.com") # 错误!
# 同步文件操作也会阻塞
# with open("large_file.txt", "r") as f: # 错误(大文件)!
# data = f.read()
# ---- 正确做法 ----
async def correct_non_blocking():
"""正确: 非阻塞操作"""
# 1. 使用 asyncio.sleep
await asyncio.sleep(5)
# 2. 使用异步 HTTP 客户端
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com")
# 3. 将阻塞操作放到线程池
import asyncio
loop = asyncio.get_event_loop()
data = await loop.run_in_executor(None, lambda: open("large_file.txt").read())
# 4. 使用 aiofiles 进行异步文件操作
# import aiofiles
# async with aiofiles.open("large_file.txt", "r") as f:
# data = await f.read()
# ---- 反模式: 忘记 await ----
async def anti_pattern_forget_await():
"""错误: 忘记 await 导致协程未执行"""
# 错误: 创建了协程但没有 await,不会执行
# result = fetch_data("api", 1) # 这只创建了协程对象,没有执行
# 正确: 必须使用 await
result = await fetch_data("api", 1)
# 如果确实想延迟执行,使用 create_task
task = asyncio.create_task(fetch_data("api", 1))
# ... 做其他事情 ...
result = await task
# ---- 反模式: 过度并发 ----
async def anti_pattern_too_many_tasks():
"""错误: 创建过多并发任务"""
urls = [f"https://api.example.com/{i}" for i in range(10000)]
# 错误: 一次性创建 10000 个任务
# tasks = [fetch_data(url, 0.1) for url in urls]
# await asyncio.gather(*tasks) # 可能导致资源耗尽
# 正确: 使用 Semaphore 控制并发数
sem = asyncio.Semaphore(100)
async def limited_fetch(url):
async with sem:
return await fetch_data(url, 0.1)
tasks = [limited_fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
return results2.2 异常处理陷阱
import asyncio
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ---- 反模式: 吞掉异常 ----
async def anti_pattern_swallow_exception():
"""错误: gather 的异常处理"""
async def failing_task():
await asyncio.sleep(0.1)
raise ValueError("任务失败")
# 错误: gather 默认在第一个异常时抛出,其他任务继续运行但结果丢失
try:
results = await asyncio.gather(
fetch_data("api1", 1),
failing_task(),
fetch_data("api2", 2),
)
except ValueError:
print("有任务失败了,但其他任务的结果丢失了")
# 正确: 使用 return_exceptions=True
results = await asyncio.gather(
fetch_data("api1", 1),
failing_task(),
fetch_data("api2", 2),
return_exceptions=True,
)
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"任务 {i} 失败: {result}")
else:
logger.info(f"任务 {i} 成功: {result}")
# ---- 正确的异常处理模式 ----
async def proper_exception_handling():
"""正确的异常处理"""
async def risky_operation(n: int):
await asyncio.sleep(0.1)
if n % 3 == 0:
raise ValueError(f"数字 {n} 不吉利")
return f"结果_{n}"
# 模式1: 逐个处理异常
async def safe_operation(n: int):
try:
return await risky_operation(n)
except ValueError as e:
logger.warning(f"操作 {n} 失败: {e}")
return None # 返回默认值
tasks = [safe_operation(i) for i in range(10)]
results = await asyncio.gather(*tasks)
successful = [r for r in results if r is not None]
print(f"成功: {len(successful)}/{len(results)}")
# 模式2: 使用 TaskGroup (Python 3.11+)
# TaskGroup 会在任一任务失败时取消所有其他任务
try:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(risky_operation(i)) for i in range(10)]
except ExceptionGroup as eg:
for exc in eg.exceptions:
logger.error(f"任务组异常: {exc}")
# ---- 取消处理 ----
async def handle_cancellation():
"""正确处理任务取消"""
try:
await asyncio.sleep(100)
except asyncio.CancelledError:
logger.info("任务被取消,执行清理...")
# 执行清理操作
await asyncio.sleep(0.1) # 清理操作
raise # 重新抛出 CancelledError(推荐)3. 异步上下文管理器与迭代器
import asyncio
from typing import AsyncIterator
# ---- 异步上下文管理器 ----
class AsyncDatabaseConnection:
"""异步数据库连接(示例)"""
def __init__(self, dsn: str):
self.dsn = dsn
self._connected = False
async def __aenter__(self):
"""进入异步上下文"""
await asyncio.sleep(0.1) # 模拟连接建立
self._connected = True
print(f"已连接: {self.dsn}")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""退出异步上下文"""
await asyncio.sleep(0.05) # 模拟连接关闭
self._connected = False
print(f"已断开: {self.dsn}")
return False # 不抑制异常
async def execute(self, query: str):
if not self._connected:
raise RuntimeError("未连接")
await asyncio.sleep(0.01) # 模拟查询
return [{"id": 1, "name": "test"}]
# 使用
async def use_async_context():
async with AsyncDatabaseConnection("postgresql://localhost/mydb") as conn:
result = await conn.execute("SELECT * FROM users")
print(result)
# 自动关闭连接
# ---- 异步迭代器与生成器 ----
class AsyncPaginator:
"""异步分页迭代器"""
def __init__(self, api_client, endpoint: str, page_size: int = 100):
self.client = api_client
self.endpoint = endpoint
self.page_size = page_size
self.current_page = 0
self.has_more = True
def __aiter__(self):
return self
async def __anext__(self):
if not self.has_more:
raise StopAsyncIteration
await asyncio.sleep(0.1) # 模拟 API 调用
self.current_page += 1
# 模拟数据
items = [{"id": i, "page": self.current_page}
for i in range(self.page_size)]
if self.current_page >= 5: # 模拟5页数据
self.has_more = False
return items
async def use_async_iterator():
"""使用异步迭代器"""
paginator = AsyncPaginator(None, "/api/items")
# 异步 for 循环
async for page in paginator:
print(f"获取到 {len(page)} 条记录")
# 异步推导式
# all_items = [item async for page in paginator for item in page]
# ---- 异步生成器 ----
async def async_range(start: int, end: int, delay: float = 0.1):
"""异步生成器"""
for i in range(start, end):
await asyncio.sleep(delay)
yield i
async def use_async_generator():
"""使用异步生成器"""
# 异步 for 循环
async for num in async_range(1, 10):
print(f"生成: {num}")
# 收集到列表
values = []
async for num in async_range(1, 10):
values.append(num)
print(f"所有值: {values}")4. 生产者-消费者模式
import asyncio
import random
from dataclasses import dataclass
@dataclass
class WorkItem:
"""工作项"""
id: int
data: str
priority: int = 0
class AsyncProducerConsumer:
"""异步生产者-消费者模式"""
def __init__(
self,
queue_size: int = 100,
num_producers: int = 2,
num_consumers: int = 3,
):
self.queue: asyncio.Queue[WorkItem] = asyncio.Queue(maxsize=queue_size)
self.num_producers = num_producers
self.num_consumers = num_consumers
self.results: list[dict] = []
self.stop_event = asyncio.Event()
async def producer(self, producer_id: int):
"""生产者"""
for i in range(20):
if self.stop_event.is_set():
break
item = WorkItem(
id=i,
data=f"数据_{producer_id}_{i}",
priority=random.randint(1, 5),
)
# 放入队列(带超时)
try:
await asyncio.wait_for(
self.queue.put(item),
timeout=5.0,
)
print(f"生产者 {producer_id}: 放入 {item.data}")
except asyncio.TimeoutError:
print(f"生产者 {producer_id}: 队列已满,超时")
await asyncio.sleep(random.uniform(0.01, 0.1))
print(f"生产者 {producer_id}: 完成")
async def consumer(self, consumer_id: int):
"""消费者"""
while not self.stop_event.is_set():
try:
item = await asyncio.wait_for(
self.queue.get(),
timeout=1.0,
)
except asyncio.TimeoutError:
continue
try:
# 处理工作项
result = await self._process_item(item, consumer_id)
self.results.append(result)
print(f"消费者 {consumer_id}: 处理完成 {item.data}")
except Exception as e:
print(f"消费者 {consumer_id}: 处理失败 {item.data} - {e}")
finally:
self.queue.task_done()
print(f"消费者 {consumer_id}: 退出")
async def _process_item(self, item: WorkItem, consumer_id: int) -> dict:
"""处理单个工作项"""
processing_time = random.uniform(0.05, 0.2)
await asyncio.sleep(processing_time)
if random.random() < 0.05: # 5% 模拟失败
raise RuntimeError(f"处理 {item.id} 时发生错误")
return {
"item_id": item.id,
"consumer_id": consumer_id,
"processing_time": processing_time,
}
async def run(self):
"""运行生产者-消费者"""
producers = [
asyncio.create_task(self.producer(i))
for i in range(self.num_producers)
]
consumers = [
asyncio.create_task(self.consumer(i))
for i in range(self.num_consumers)
]
# 等待所有生产者完成
await asyncio.gather(*producers)
# 等待队列清空
await self.queue.join()
# 停止消费者
self.stop_event.set()
await asyncio.gather(*consumers, return_exceptions=True)
print(f"\n总处理结果: {len(self.results)} 条")
return self.results
# 优先级队列版本
class AsyncPriorityProducerConsumer:
"""基于优先级的异步生产者-消费者"""
def __init__(self, max_workers: int = 3):
self.queue: asyncio.PriorityQueue[tuple[int, WorkItem]] = asyncio.PriorityQueue()
self.max_workers = max_workers
async def submit(self, item: WorkItem):
"""提交工作项(优先级越低越优先)"""
await self.queue.put((item.priority, item))
async def worker(self, worker_id: int):
"""工作者"""
while True:
priority, item = await self.queue.get()
try:
await asyncio.sleep(0.1) # 模拟处理
print(f"Worker {worker_id}: 处理优先级 {priority} 的 {item.data}")
finally:
self.queue.task_done()5. 异步 HTTP 客户端
import asyncio
import httpx
import time
# ---- httpx 异步客户端 ----
async def basic_httpx():
"""httpx 基础用法"""
async with httpx.AsyncClient(
base_url="https://httpbin.org",
timeout=httpx.Timeout(10.0),
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
) as client:
# GET 请求
response = await client.get("/get", params={"key": "value"})
data = response.json()
# POST 请求
response = await client.post("/post", json={"name": "test"})
# 并发请求
responses = await asyncio.gather(
client.get("/delay/1"),
client.get("/delay/2"),
client.get("/delay/1"),
)
async def robust_http_client():
"""带重试和错误处理的 HTTP 客户端"""
class RetryableClient:
def __init__(
self,
max_retries: int = 3,
retry_delay: float = 1.0,
backoff_factor: float = 2.0,
):
self.max_retries = max_retries
self.retry_delay = retry_delay
self.backoff_factor = backoff_factor
async def request(
self,
method: str,
url: str,
**kwargs,
) -> httpx.Response:
last_error = None
delay = self.retry_delay
for attempt in range(self.max_retries):
try:
async with httpx.AsyncClient() as client:
response = await client.request(method, url, **kwargs)
response.raise_for_status()
return response
except httpx.TimeoutException as e:
last_error = e
print(f" 超时 (尝试 {attempt + 1}/{self.max_retries})")
except httpx.HTTPStatusError as e:
if e.response.status_code < 500:
raise # 4xx 不重试
last_error = e
print(f" 服务器错误 {e.response.status_code} (尝试 {attempt + 1})")
except httpx.RequestError as e:
last_error = e
print(f" 请求错误 (尝试 {attempt + 1})")
await asyncio.sleep(delay)
delay *= self.backoff_factor
raise last_error or Exception("所有重试失败")
client = RetryableClient(max_retries=3)
response = await client.request("GET", "https://httpbin.org/get")
print(response.json())
# ---- aiohttp 用法 ----
async def aiohttp_example():
"""aiohttp 基础用法"""
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/get") as resp:
data = await resp.json()
print(data)
# POST JSON
async with session.post(
"https://httpbin.org/post",
json={"key": "value"},
) as resp:
data = await resp.json()
# 流式下载
async with session.get("https://httpbin.org/stream/10") as resp:
async for line in resp.content:
print(line.decode())6. 异步数据库操作
import asyncio
# ---- asyncpg (PostgreSQL 异步驱动) ----
async def asyncpg_example():
"""asyncpg 使用示例"""
import asyncpg
# 创建连接池
pool = await asyncpg.create_pool(
dsn="postgresql://user:pass@localhost/mydb",
min_size=5,
max_size=20,
command_timeout=30,
)
async with pool.acquire() as conn:
# 创建表
await conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
)
""")
# 插入数据
user_id = await conn.fetchval(
"INSERT INTO users(name, email) VALUES($1, $2) RETURNING id",
"张三", "zhangsan@example.com"
)
# 批量插入
users = [
("李四", "lisi@example.com"),
("王五", "wangwu@example.com"),
("赵六", "zhaoliu@example.com"),
]
await conn.executemany(
"INSERT INTO users(name, email) VALUES($1, $2) ON CONFLICT DO NOTHING",
users,
)
# 查询单条
user = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1", user_id
)
print(dict(user))
# 查询多条
all_users = await conn.fetch("SELECT * FROM users ORDER BY id")
for row in all_users:
print(dict(row))
await pool.close()
# ---- aiomysql ----
async def aiomysql_example():
"""aiomysql 使用示例"""
import aiomysql
pool = await aiomysql.create_pool(
host="localhost",
port=3306,
user="root",
password="password",
db="mydb",
minsize=5,
maxsize=20,
autocommit=True,
)
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute("SELECT * FROM users LIMIT 10")
rows = await cur.fetchall()
for row in rows:
print(row)
pool.close()
await pool.wait_closed()
# ---- SQLAlchemy 2.0 异步 ----
async def sqlalchemy_async_example():
"""SQLAlchemy 2.0 异步用法"""
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy import select, text
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column()
email: Mapped[str] = mapped_column()
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/mydb",
pool_size=20,
max_overflow=10,
)
async_session = async_sessionmaker(engine, expire_on_commit=False)
async with async_session() as session:
# 查询
result = await session.execute(
select(User).where(User.name.like("%张%"))
)
users = result.scalars().all()
# 插入
new_user = User(name="新用户", email="new@example.com")
session.add(new_user)
await session.commit()
await engine.dispose()7. 同步 vs 异步性能对比
import asyncio
import time
import httpx
async def benchmark_async_vs_sync():
"""异步 vs 同步性能对比"""
urls = [f"https://httpbin.org/delay/{i % 3 + 1}" for i in range(30)]
# 同步版本
def sync_fetch():
start = time.time()
import requests
session = requests.Session()
for url in urls:
session.get(url)
return time.time() - start
# 异步版本
async def async_fetch():
start = time.time()
sem = asyncio.Semaphore(10)
async def fetch_one(client, url):
async with sem:
return await client.get(url)
async with httpx.AsyncClient() as client:
tasks = [fetch_one(client, url) for url in urls]
await asyncio.gather(*tasks)
return time.time() - start
# 异步(带连接池)
async def async_fetch_pooled():
start = time.time()
async with httpx.AsyncClient(
limits=httpx.Limits(max_connections=20)
) as client:
sem = asyncio.Semaphore(20)
async def fetch_one(url):
async with sem:
return await client.get(url)
await asyncio.gather(*[fetch_one(url) for url in urls])
return time.time() - start
print("运行同步版本...")
sync_time = sync_fetch()
print(f"同步: {sync_time:.1f}s")
print("运行异步版本...")
async_time = await async_fetch()
print(f"异步: {async_time:.1f}s")
print("运行异步(连接池)版本...")
async_pooled_time = await async_fetch_pooled()
print(f"异步(连接池): {async_pooled_time:.1f}s")
print(f"\n加速比: {sync_time/async_time:.1f}x (异步)")
print(f"加速比: {sync_time/async_pooled_time:.1f}x (异步+连接池)")8. 异步调试
import asyncio
import logging
# ---- 调试模式 ----
async def enable_debug_mode():
"""启用 asyncio 调试模式"""
# 方式1: 通过事件循环
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.slow_callback_duration = 0.1 # 超过100ms的回调会被警告
# 方式2: 通过环境变量 PYTHONASYNCIODEBUG=1
# 方式3: Python 3.12+
# asyncio.run(main(), debug=True)
# ---- 自定义调试工具 ----
class AsyncTaskMonitor:
"""异步任务监控器"""
def __init__(self):
self.tasks_info: dict[int, dict] = {}
def monitor_tasks(self):
"""监控所有任务"""
tasks = asyncio.all_tasks()
print(f"\n当前任务数: {len(tasks)}")
for task in tasks:
print(f" - {task.get_name()}: "
f"done={task.done()}, "
f"cancelled={task.cancelled()}")
def track_task(self, task: asyncio.Task):
"""追踪任务生命周期"""
import traceback
created_stack = traceback.format_stack()
def done_callback(t: asyncio.Task):
if t.cancelled():
print(f"任务 {t.get_name()} 被取消")
elif t.exception():
print(f"任务 {t.get_name()} 异常: {t.exception()}")
else:
print(f"任务 {t.get_name()} 完成: {t.result()}")
task.add_done_callback(done_callback)
# ---- aiomonitor 集成 ----
async def with_aiomonitor():
"""使用 aiomonitor 监控异步任务"""
# pip install aiomonitor
# import aiomonitor
#
# async with aiomonitor.start_monitor(loop=asyncio.get_event_loop()):
# await main_logic()
#
# 然后可以通过 telnet localhost 20101 连接监控终端
# 手动实现简易监控
while True:
tasks = asyncio.all_tasks()
print(f"[监控] 活跃任务: {len(tasks)}")
for task in tasks:
if not task.done() and not task.get_name().startswith("monitor"):
print(f" 运行中: {task.get_name()}")
await asyncio.sleep(5)9. 异步信号与优雅关闭
import asyncio
import signal
import os
class AsyncApplication:
"""支持优雅关闭的异步应用"""
def __init__(self):
self.shutdown_event = asyncio.Event()
self.tasks: list[asyncio.Task] = []
async def start(self):
"""启动应用"""
print(f"应用启动, PID: {os.getpid()}")
# 注册信号处理
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, self._signal_handler, sig)
# 启动工作任务
for i in range(3):
task = asyncio.create_task(
self._worker(i), name=f"worker_{i}"
)
self.tasks.append(task)
# 等待关闭信号
await self.shutdown_event.wait()
# 优雅关闭
await self._graceful_shutdown()
def _signal_handler(self, sig):
"""信号处理"""
print(f"\n收到信号 {sig.name}, 开始优雅关闭...")
self.shutdown_event.set()
async def _worker(self, worker_id: int):
"""工作任务"""
while not self.shutdown_event.is_set():
try:
await asyncio.sleep(1)
print(f"Worker {worker_id}: 工作中...")
except asyncio.CancelledError:
print(f"Worker {worker_id}: 收到取消信号")
break
async def _graceful_shutdown(self):
"""优雅关闭"""
print("开始优雅关闭...")
# 取消所有任务
for task in self.tasks:
task.cancel()
# 等待任务完成(带超时)
results = await asyncio.gather(*self.tasks, return_exceptions=True)
for task, result in zip(self.tasks, results):
if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError):
print(f"任务 {task.get_name()} 异常: {result}")
print("所有任务已关闭")优点
- 高并发: 单线程即可处理数千并发连接
- 低资源消耗: 协程比线程轻量得多,无需线程切换开销
- 代码可读性: async/await 语法接近同步代码
- 灵活控制: 支持超时、取消、优先级等精细控制
- 生态丰富: 数据库、HTTP、文件等都有成熟的异步库
缺点
- 学习曲线: 异步编程思维模型与同步编程差异大
- 传染性: 一旦使用 async,调用链上的所有函数都需要是 async
- 调试困难: 异步调用栈比同步复杂,排错更困难
- 阻塞风险: 一个阻塞操作会卡住整个事件循环
- 库兼容性: 必须使用异步版本的库,不能混用同步库
性能注意事项
- 选择合适的并发数: 使用 Semaphore 控制并发,通常 100-500 并发较合适
- 连接池复用: HTTP 和数据库都应使用连接池,避免频繁建连
- 减少事件循环阻塞: 所有 I/O 操作使用异步版本,CPU 密集型操作放线程池
- 避免过度创建任务: 每个任务都有开销,合理使用 gather 和 TaskGroup
- 监控慢回调: 设置
slow_callback_duration检测阻塞事件循环的操作
总结
Python 异步编程是一门需要深入理解的技术:
- 核心概念: 理解事件循环、协程、Task 的关系和区别
- 避免阻塞: 确保所有 I/O 操作都使用异步版本
- 异常处理: 使用
return_exceptions=True和正确的 try/except 模式 - 资源控制: 使用 Semaphore 限制并发,使用连接池复用连接
- 优雅关闭: 实现信号处理和任务取消逻辑
关键知识点
| 概念 | 说明 |
|---|---|
| Event Loop | 事件循环,异步调度的核心,负责执行协程和回调 |
| Coroutine | 协程,使用 async def 定义的函数,需要 await 调用 |
| Task | 对协程的封装,支持取消、状态查询和回调 |
| Semaphore | 信号量,控制并发任务数量的同步原语 |
| Queue | 异步队列,用于生产者-消费者模式 |
| gather | 并发运行多个协程,等待全部完成 |
| TaskGroup | Python 3.11+ 的任务组,更安全的多任务管理 |
常见误区
误区: async 一定比 sync 快
- CPU 密集型任务异步没有优势,甚至更慢
- 解决: 仅在 I/O 密集型场景使用异步
误区: 多加 await 就能提升性能
- await 只是让出控制权,不会自动并行
- 解决: 需要配合 create_task 或 gather 实现并发
误区: asyncio.Queue 和 queue.Queue 一样
- 不能混用,asyncio.Queue 需要 await 操作
- 解决: 在异步代码中统一使用 asyncio.Queue
误区: 异步代码不需要锁
- 多个协程访问共享状态仍需要同步机制
- 解决: 使用 asyncio.Lock 保护共享状态
进阶路线
- 入门: 掌握 async/await 语法,理解事件循环
- 进阶: 使用异步 HTTP 和数据库客户端,构建异步 Web 服务
- 高级: 实现复杂的异步模式(生产者-消费者、流处理、限流)
- 专家: 优化事件循环性能,调试复杂异步问题,设计异步架构
适用场景
- 高并发 Web API 服务
- 网络爬虫和数据抓取
- 实时消息处理(WebSocket、聊天服务)
- 异步任务队列和后台作业
- 微服务间的高并发调用
- 流式数据处理
落地建议
- 渐进式迁移: 先将 I/O 最密集的部分改为异步,逐步扩展
- 统一异步框架: 项目中选择一个异步 Web 框架(FastAPI/aiohttp),统一使用
- 监控先行: 部署前建立任务监控和慢回调检测
- 代码审查: 重点审查是否有阻塞操作和未 await 的协程
- 性能基准: 建立异步和同步的性能基准,量化优化效果
排错清单
| 问题 | 可能原因 | 解决方案 |
|---|---|---|
| RuntimeWarning: coroutine was never awaited | 忘记 await 协程 | 检查所有 async 函数调用是否 await |
| 事件循环卡死 | 在异步代码中调用了阻塞操作 | 使用 run_in_executor 或替换为异步库 |
| Task was destroyed but it is pending | 任务未完成就被垃圾回收 | 确保所有任务都被 await 或取消 |
| 连接泄漏 | 未正确关闭 async with 上下文 | 使用 async with 管理资源 |
| 内存持续增长 | 任务堆积,未限制并发 | 添加 Semaphore 控制并发数 |
| 高延迟 | DNS 解析阻塞或连接未复用 | 使用连接池,缓存 DNS |
复盘问题
- 异步改造后的 QPS 提升了多少?P99 延迟是否有改善?
- 是否存在阻塞事件循环的操作?slow_callback_duration 告警频率如何?
- 连接池配置是否合理?连接等待超时的情况多吗?
- 异步代码的错误率和同步版本相比如何?
- 线程池(run_in_executor)的使用是否过多?是否可以替换为异步库?
