Python 异步进阶
大约 11 分钟约 3425 字
Python 异步进阶
简介
Python 异步进阶涵盖 asyncio 事件循环的高级用法、协程调度策略、异步上下文管理和高并发模式设计。在掌握 async/await 基础语法之后,需要理解事件循环机制、任务编排、超时控制、信号量限流等核心概念,才能构建稳定可靠的异步服务。
asyncio 的核心抽象是"事件循环"(Event Loop)——一个运行在单线程中的无限循环,不断检查哪些 I/O 操作已经完成,然后恢复对应的协程执行。当协程遇到 I/O 操作(网络请求、文件读写、数据库查询)时,它会"挂起"(yield control),让事件循环去执行其他协程。当 I/O 完成后,事件循环通过回调恢复该协程。这种协作式多任务(cooperative multitasking)的特点是:协程主动让出控制权,不存在线程切换的开销,但也意味着一个协程如果长时间占用 CPU,其他所有协程都会被阻塞。
特点
事件循环深入理解
事件循环的生命周期
import asyncio
# 方式一:asyncio.run() —— 最常用(自动创建和关闭事件循环)
async def main():
await asyncio.sleep(1)
print("完成")
asyncio.run(main()) # 推荐:自动管理事件循环生命周期
# 方式二:手动管理事件循环(高级场景)
async def legacy_main():
await asyncio.sleep(1)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(legacy_main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
# 方式三:获取当前运行的事件循环
async def get_current_loop():
loop = asyncio.get_running_loop()
print(f"当前事件循环: {loop}")
print(f"是否在运行: {loop.is_running()}")
# 方式四:在已有事件循环中调度任务
async def schedule_tasks():
# 创建任务 —— 立即加入事件循环
task1 = asyncio.create_task(asyncio.sleep(1))
task2 = asyncio.create_task(asyncio.sleep(2))
# 等待所有任务完成
await asyncio.gather(task1, task2)
print("所有任务完成")
# call_soon —— 调度回调(非异步)
loop = asyncio.get_running_loop()
loop.call_soon(print, "这是一个同步回调")
# call_later —— 延迟调度
loop.call_later(1.0, print, "1秒后执行")
await asyncio.sleep(2)事件循环的常见陷阱
import asyncio
import time
# 陷阱 1:在 async 函数中调用阻塞函数
async def bad_example():
# time.sleep(1) # 错误!阻塞整个事件循环 1 秒
await asyncio.sleep(1) # 正确!只挂起当前协程
# 陷阱 2:忘记 await
async def missing_await():
# asyncio.sleep(1) # 错误!创建了协程但没有调度执行
# 正确:
await asyncio.sleep(1)
# 陷阱 3:在非 async 函数中创建事件循环冲突
def sync_function():
# asyncio.run(async_function()) # 如果已在事件循环中会报错
pass
# 解决方案:使用 run_in_executor 执行阻塞函数
async def run_blocking_code():
loop = asyncio.get_running_loop()
# 方式一:默认线程池
result = await loop.run_in_executor(None, time.sleep, 1)
print("阻塞函数执行完毕")
# 方式二:自定义线程池
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as pool:
result = await loop.run_in_executor(
pool,
lambda: heavy_computation(1000)
)
return result
def heavy_computation(n: int) -> int:
"""CPU 密集型计算"""
return sum(i * i for i in range(n))
# 陷阱 4:永远不要在 async 函数中使用 requests(同步 HTTP 库)
# 应该使用 aiohttp 或 httpx
async def fetch_data_async():
# 错误:requests.get("https://api.example.com") # 阻塞!
# 正确:
import httpx
async with httpx.AsyncClient() as client:
resp = await client.get("https://httpbin.org/get")
return resp.json()实现
高级任务编排与并发控制
import asyncio
from typing import Any
import aiohttp
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
"""异步 HTTP 请求"""
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
return {"url": url, "status": resp.status, "data": await resp.text()}
async def fetch_with_semaphore(urls: list[str], max_concurrent: int = 5):
"""使用信号量控制并发数"""
semaphore = asyncio.Semaphore(max_concurrent)
async def limited_fetch(session, url):
async with semaphore:
print(f"开始请求: {url}")
try:
return await fetch_url(session, url)
except Exception as e:
return {"url": url, "error": str(e)}
async with aiohttp.ClientSession() as session:
tasks = [limited_fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def fetch_with_timeout(urls: list[str], timeout: float = 5.0):
"""带超时的批量请求"""
async with aiohttp.ClientSession() as session:
tasks = [
asyncio.wait_for(fetch_url(session, url), timeout=timeout)
for url in urls
]
results = await asyncio.gather(*tasks, return_exceptions=True)
success = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
print(f"成功: {len(success)}, 失败: {len(failures)}")
return success, failures深入任务编排
import asyncio
async def task_a():
await asyncio.sleep(1)
return "A 完成"
async def task_b():
await asyncio.sleep(2)
return "B 完成"
async def task_c():
await asyncio.sleep(0.5)
raise ValueError("C 失败")
# 1. gather —— 并发执行,等待全部完成
async def demo_gather():
results = await asyncio.gather(task_a(), task_b())
print(results) # ["A 完成", "B 完成"]
# gather 收集异常而不中断
async def demo_gather_exceptions():
results = await asyncio.gather(
task_a(), task_b(), task_c(),
return_exceptions=True
)
for r in results:
if isinstance(r, Exception):
print(f"异常: {r}")
else:
print(f"结果: {r}")
# 2. wait —— 更灵活的等待策略
async def demo_wait():
tasks = {asyncio.create_task(task_a()), asyncio.create_task(task_b())}
done, pending = await asyncio.wait(tasks, timeout=1.5)
print(f"已完成: {len(done)}, 未完成: {len(pending)}")
# 取消未完成的任务
for task in pending:
task.cancel()
# 3. FIRST_COMPLETED / FIRST_EXCEPTION / ALL_COMPLETED
async def demo_wait_first():
tasks = [asyncio.create_task(task_a()), asyncio.create_task(task_b())]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
result = done.pop().result()
print(f"第一个完成: {result}")
for task in pending:
task.cancel()
# 4. as_completed —— 按完成顺序获取结果
async def demo_as_completed():
tasks = [asyncio.create_task(task_a()), asyncio.create_task(task_b())]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"完成: {result}")
# 5. shield —— 保护任务不被外部取消
async def demo_shield():
async def critical_operation():
await asyncio.sleep(2)
return "关键操作完成"
try:
result = await asyncio.wait_for(
asyncio.shield(critical_operation()),
timeout=1.0
)
except asyncio.TimeoutError:
print("超时了,但关键操作仍在后台运行")
# 注意:被 shield 的协程不会被取消异步生产者-消费者模式
import asyncio
import random
from typing import AsyncGenerator
async def data_producer(queue: asyncio.Queue, count: int):
"""异步数据生产者"""
for i in range(count):
item = {"id": i, "value": random.randint(1, 100)}
await queue.put(item)
print(f"生产: {item}")
await asyncio.sleep(random.uniform(0.01, 0.05))
# 发送结束信号
for _ in range(3): # 消费者数量
await queue.put(None)
async def data_consumer(queue: asyncio.Queue, consumer_id: int, results: list):
"""异步数据消费者"""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
# 模拟处理
await asyncio.sleep(random.uniform(0.02, 0.08))
processed = {**item, "consumer": consumer_id, "doubled": item["value"] * 2}
results.append(processed)
print(f"消费者 {consumer_id} 处理: {item['id']}")
queue.task_done()
async def run_pipeline():
"""运行生产者-消费者管道"""
queue = asyncio.Queue(maxsize=50)
results = []
producer = asyncio.create_task(data_producer(queue, count=20))
consumers = [
asyncio.create_task(data_consumer(queue, cid, results))
for cid in range(3)
]
await producer
await asyncio.gather(*consumers)
print(f"\n处理完成,共 {len(results)} 条")
for r in sorted(results, key=lambda x: x["id"])[:5]:
print(f" ID={r['id']}, value={r['value']}, doubled={r['doubled']}")
# asyncio.run(run_pipeline())异步重试、缓存与熔断
import asyncio
import time
from functools import wraps
from typing import Callable, Any
def async_retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0):
"""异步重试装饰器"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs) -> Any:
current_delay = delay
last_exception = None
for attempt in range(1, max_attempts + 1):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_attempts:
print(f"[重试 {attempt}/{max_attempts}] {func.__name__}: {e}")
await asyncio.sleep(current_delay)
current_delay *= backoff
raise last_exception
return wrapper
return decorator
class CircuitBreaker:
"""异步熔断器"""
def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 30.0):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0
self.state = "closed" # closed, open, half_open
async def call(self, func: Callable, *args, **kwargs) -> Any:
if self.state == "open":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half_open"
else:
raise RuntimeError("熔断器开启,请求被拒绝")
try:
result = await func(*args, **kwargs)
if self.state == "half_open":
self.state = "closed"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
print(f"熔断器触发: 连续 {self.failure_count} 次失败")
raise
# 使用示例
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=10.0)
@async_retry(max_attempts=2, delay=0.5)
async def call_api(endpoint: str):
return await breaker.call(fetch_data, endpoint)
async def fetch_data(endpoint: str):
await asyncio.sleep(0.1)
return {"endpoint": endpoint, "data": "ok"}结构化并发(Python 3.11+ TaskGroup)
import asyncio
from typing import Any
async def process_item(item_id: int) -> dict:
"""处理单个项目"""
await asyncio.sleep(0.1)
if item_id == 5:
raise ValueError(f"项目 {item_id} 处理失败")
return {"id": item_id, "result": item_id * 10}
async def structured_concurrent_processing():
"""使用 TaskGroup 实现结构化并发"""
results = []
try:
async with asyncio.TaskGroup() as tg:
tasks = {
tg.create_task(process_item(i), name=f"task-{i}")
for i in range(10)
}
except* ValueError as eg:
print(f"部分任务失败: {eg.exceptions}")
for exc in eg.exceptions:
print(f" 错误: {exc}")
for task in tasks:
if task.done() and not task.cancelled() and task.exception() is None:
results.append(task.result())
print(f"成功处理: {len(results)} 个")
return results
async def timeout_example():
"""超时控制示例"""
try:
result = await asyncio.wait_for(
asyncio.sleep(10), # 模拟长时间操作
timeout=2.0
)
except asyncio.TimeoutError:
print("操作超时,已取消")
# as_completed 模式:谁先完成先处理
async def delayed_result(n: int) -> int:
await asyncio.sleep(n * 0.1)
return n
tasks = [asyncio.create_task(delayed_result(i)) for i in range(5, 0, -1)]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"完成: {result}")
# asyncio.run(structured_concurrent_processing())异步上下文管理器实战
import asyncio
from contextlib import asynccontextmanager
# 1. 异步数据库连接池
class AsyncConnectionPool:
def __init__(self, pool_size: int = 5):
self.pool_size = pool_size
self._pool: asyncio.Queue = asyncio.Queue(maxsize=pool_size)
self._initialized = False
async def initialize(self):
for i in range(self.pool_size):
await self._pool.put({"id": i, "status": "ready"})
self._initialized = True
async def acquire(self):
conn = await self._pool.get()
conn["status"] = "in_use"
return conn
async def release(self, conn):
conn["status"] = "ready"
await self._pool.put(conn)
async def close(self):
while not self._pool.empty():
conn = await self._pool.get()
conn["status"] = "closed"
self._initialized = False
@asynccontextmanager
async def get_connection(pool: AsyncConnectionPool):
conn = await pool.acquire()
try:
yield conn
finally:
await pool.release(conn)
# 2. 异步锁与限流
class AsyncRateLimiter:
"""基于令牌桶的异步速率限制器"""
def __init__(self, rate: float, burst: int = 1):
self.rate = rate # 每秒补充的令牌数
self.burst = burst # 桶的最大容量
self._tokens = burst
self._last_refill = asyncio.get_event_loop().time()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = asyncio.get_event_loop().time()
elapsed = now - self._last_refill
self._tokens = min(self.burst, self._tokens + elapsed * self.rate)
self._last_refill = now
if self._tokens < 1:
wait_time = (1 - self._tokens) / self.rate
await asyncio.sleep(wait_time)
self._tokens = 0
else:
self._tokens -= 1
# 3. 使用示例
async def demo_pool():
pool = AsyncConnectionPool(pool_size=3)
await pool.initialize()
async def worker(worker_id: int):
async with get_connection(pool) as conn:
await asyncio.sleep(0.1)
print(f"Worker {worker_id} 使用连接 {conn['id']}")
await asyncio.gather(*[worker(i) for i in range(10)])
await pool.close()
# asyncio.run(demo_pool())异步日志与监控
import asyncio
import logging
import time
from contextvars import ContextVar
# 异步安全的上下文变量
request_id_var: ContextVar[str] = ContextVar("request_id", default="")
class AsyncContextFilter(logging.Filter):
"""日志过滤器:自动添加 request_id"""
def filter(self, record):
record.request_id = request_id_var.get("N/A")
return True
# 配置异步日志
logger = logging.getLogger("async_app")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter("%(asctime)s [%(request_id)s] %(levelname)s %(message)s")
)
handler.addFilter(AsyncContextFilter())
logger.addHandler(handler)
async def handle_request(request_id: str):
"""处理请求,自动关联日志"""
token = request_id_var.set(request_id)
try:
logger.info("开始处理请求")
await asyncio.sleep(0.5)
logger.info("请求处理完成")
finally:
request_id_var.reset(token)
async def main():
await asyncio.gather(
handle_request("req-001"),
handle_request("req-002"),
handle_request("req-003"),
)
# asyncio.run(main())
# 输出每行都带有对应的 request_id优点
缺点
总结
异步进阶的核心是掌握任务编排(gather、TaskGroup)、并发控制(Semaphore、Queue)和容错机制(超时、重试、熔断)三大能力。在高并发 I/O 场景下,asyncio 能以单线程实现数千并发连接,但需要严格避免在协程中执行阻塞操作。结构化并发(TaskGroup)是 Python 3.11+ 的推荐模式。
关键知识点
- asyncio.gather 并发执行多个协程,return_exceptions=True 可收集异常而不中断
- asyncio.Semaphore 控制并发上限,asyncio.Queue 实现生产者-消费者解耦
- asyncio.wait_for 提供超时控制,超时触发 CancelledError
- Python 3.11+ 的 TaskGroup 实现结构化并发,任务异常会自动取消同组任务
- run_in_executor 将阻塞函数在线程池中执行,避免阻塞事件循环
- contextvars 实现异步安全的上下文传递
项目落地视角
- Web 服务使用 FastAPI + async/await 处理高并发请求,数据库驱动选用 asyncpg/aiomysql
- 外部 API 调用统一配置超时、重试和熔断策略,避免级联故障
- CPU 密集型任务使用 ProcessPoolExecutor 在独立进程中执行,避免阻塞事件循环
- 异步代码的日志需要携带 request_id 等上下文信息,便于链路追踪
常见误区
- 在 async 函数中调用同步阻塞函数(如 requests.get、time.sleep),导致整个事件循环卡住
- 无限制地创建协程任务,导致资源耗尽或目标服务被打垮
- 忽略异常处理,协程异常被静默吞掉
- 混用多线程和异步,未正确使用 run_in_executor 桥接
- 在 async 函数中忘记 await 导致协程未被执行
- 使用 asyncio.run() 嵌套调用(会导致 RuntimeError)
进阶路线
- 学习 uvloop 替代默认事件循环,提升异步性能
- 研究 anyio 库实现 asyncio/trio 双后端兼容
- 深入理解 Python 3.12+ 的 asyncio 改进和性能优化
- 探索分布式异步任务调度方案(Celery + asyncio)
适用场景
- 高并发 Web 服务和 API 网关,需要处理大量并发连接
- 实时数据推送、WebSocket 长连接、消息队列消费等 I/O 密集场景
- 微服务间的高并发 RPC 调用和聚合查询
落地建议
- 统一异步基础设施:HTTP 客户端、数据库驱动、Redis 客户端全部选用异步版本
- 为所有外部调用配置合理的超时和重试策略
- 使用 Semaphore 限制并发上限,防止雪崩效应
- 使用 contextvars 实现请求级别的上下文传递
排错清单
- 检查异步函数中是否存在同步阻塞调用(requests、subprocess、file I/O)
- 确认事件循环是否正常运行,是否有未 await 的协程
- 排查任务是否因异常被静默取消,检查 gather 的 return_exceptions 设置
- 确认是否正确处理了 CancelledError
复盘问题
- 你的异步服务在峰值并发下的表现如何?是否有明确的限流和熔断策略?
- 异步代码中的异常是否都能被正确捕获和记录?是否存在静默失败?
- CPU 密集型任务是否被正确隔离到独立进程?对事件循环的影响有多大?
