Python 异步编程深入
大约 10 分钟约 3073 字
Python 异步编程深入
简介
Python 的 asyncio 提供了基于协程的异步编程模型。理解事件循环、协程、Task 和异步上下文管理器的原理,有助于构建高并发的网络应用和 I/O 密集型服务。
特点
协程基础
async/await 语法
import asyncio
import time
# 协程函数
async def fetch_data(url: str) -> dict:
"""模拟异步 HTTP 请求"""
await asyncio.sleep(0.5) # 模拟 IO 等待
return {"url": url, "data": f"Response from {url}"}
# 并发执行多个协程
async def fetch_all(urls: list[str]) -> list[dict]:
"""并发获取所有 URL"""
tasks = [fetch_data(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Error fetching {urls[i]}: {result}")
return [r for r in results if not isinstance(r, Exception)]
# TaskGroup(Python 3.11+,推荐)
async def fetch_with_taskgroup(urls: list[str]) -> list[dict]:
"""使用 TaskGroup 并发获取"""
results = {}
async with asyncio.TaskGroup() as tg:
async def fetch_and_store(url):
try:
results[url] = await fetch_data(url)
except Exception as e:
results[url] = {"error": str(e)}
for url in urls:
tg.create_task(fetch_and_store(url))
return list(results.values())
# 超时控制
async def fetch_with_timeout(url: str, timeout: float = 5.0) -> dict:
"""带超时的异步请求"""
try:
async with asyncio.timeout(timeout): # Python 3.11+
return await fetch_data(url)
except TimeoutError:
return {"url": url, "error": "Timeout"}
# 取消任务
async def cancellable_operation():
"""可取消的长时间操作"""
try:
for i in range(100):
await asyncio.sleep(0.1)
print(f"Progress: {i}%")
except asyncio.CancelledError:
print("操作被取消")
raise # 重新抛出,让调用者知道
async def main():
task = asyncio.create_task(cancellable_operation())
await asyncio.sleep(0.5)
task.cancel() # 取消任务
try:
await task
except asyncio.CancelledError:
print("任务已取消")事件循环
自定义事件循环行为
import asyncio
# 事件循环生命周期:
# 1. 创建事件循环
# 2. 注册协程/Task
# 3. 运行直到所有 Task 完成
# 4. 关闭事件循环
# 手动控制事件循环
async def custom_loop():
loop = asyncio.get_running_loop()
# 在线程池中运行阻塞函数
def blocking_io():
import time
time.sleep(1)
return "blocking result"
result = await loop.run_in_executor(None, blocking_io)
print(f"阻塞函数结果: {result}")
# 使用 ProcessPoolExecutor 运行 CPU 密集任务
from concurrent.futures import ProcessPoolExecutor
def cpu_bound_task(n):
return sum(i * i for i in range(n))
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound_task, 10**7)
print(f"CPU 任务结果: {result}")
# 定时回调
def periodic_callback():
print(f"定时回调: {time.strftime('%H:%M:%S')}")
loop.call_later(5.0, periodic_callback) # 5 秒后执行
loop.call_at(loop.time() + 10.0, periodic_callback) # 10 秒后执行
# 信号量控制并发数
async def rate_limited_requests(urls: list[str], max_concurrent: int = 10):
"""限制并发数的请求"""
semaphore = asyncio.Semaphore(max_concurrent)
results = []
async def limited_fetch(url):
async with semaphore:
return await fetch_data(url)
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(limited_fetch(url)) for url in urls]
return [task.result() for task in tasks]
# 生产者-消费者模式
async def producer_consumer():
"""异步生产者-消费者"""
queue = asyncio.Queue(maxsize=10)
async def producer(id: int):
for i in range(5):
item = f"item-{id}-{i}"
await queue.put(item)
print(f"生产者 {id} → {item}")
await asyncio.sleep(0.1)
async def consumer(id: int):
while True:
item = await queue.get()
print(f"消费者 {id} ← {item}")
await asyncio.sleep(0.2) # 模拟处理
queue.task_done()
# 启动生产者和消费者
producers = [asyncio.create_task(producer(i)) for i in range(3)]
consumers = [asyncio.create_task(consumer(i)) for i in range(2)]
# 等待所有生产者完成
await asyncio.gather(*producers)
# 等待队列处理完毕
await queue.join()
# 取消消费者
for c in consumers:
c.cancel()异步 IO 库
aiohttp 与 aioredis
# aiohttp — 异步 HTTP 客户端/服务端
# pip install aiohttp
import aiohttp
# 异步 HTTP 客户端
class AsyncHttpClient:
def __init__(self, base_url: str, timeout: float = 30.0):
self.base_url = base_url
self.timeout = aiohttp.ClientTimeout(total=timeout)
self._session: aiohttp.ClientSession | None = None
async def get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
base_url=self.base_url,
timeout=self.timeout,
headers={"Content-Type": "application/json"}
)
return self._session
async def get(self, path: str, params: dict = None) -> dict:
session = await self.get_session()
async with session.get(path, params=params) as response:
response.raise_for_status()
return await response.json()
async def post(self, path: str, data: dict = None) -> dict:
session = await self.get_session()
async with session.post(path, json=data) as response:
response.raise_for_status()
return await response.json()
async def close(self):
if self._session and not self._session.closed:
await self._session.close()
# 异步上下文管理器
async def __aenter__(self):
await self.get_session()
return self
async def __aexit__(self, *args):
await self.close()
# 使用
async def fetch_users():
async with AsyncHttpClient("https://api.example.com") as client:
users = await client.get("/users", params={"page": 1})
user = await client.get(f"/users/{users[0]['id']}")
return user
# 异步迭代器
async def fetch_all_pages(base_url: str):
"""异步分页迭代"""
async with AsyncHttpClient(base_url) as client:
page = 1
while True:
data = await client.get("/items", params={"page": page})
if not data["items"]:
break
yield data["items"]
page += 1
# 使用异步迭代器
async def process_all_items():
async for items in fetch_all_pages("https://api.example.com"):
for item in items:
process_item(item)
# aioredis(redis 异步客户端)
# pip install redis
import redis.asyncio as aioredis
class AsyncRedisClient:
def __init__(self, url: str = "redis://localhost:6379"):
self.url = url
self.redis: aioredis.Redis | None = None
async def connect(self):
self.redis = aioredis.from_url(
self.url,
decode_responses=True,
max_connections=20
)
async def cache_get(self, key: str, ttl: int = 300):
"""缓存获取"""
value = await self.redis.get(key)
if value:
return json.loads(value)
return None
async def cache_set(self, key: str, value, ttl: int = 300):
"""缓存设置"""
await self.redis.setex(key, ttl, json.dumps(value))
async def cache_with_fallback(self, key: str, fetcher, ttl: int = 300):
"""带回源的缓存"""
cached = await self.cache_get(key)
if cached is not None:
return cached
data = await fetcher()
await self.cache_set(key, data, ttl)
return data
# 分布式锁
async def acquire_lock(self, lock_name: str, timeout: int = 10):
"""分布式锁"""
lock = self.redis.lock(lock_name, timeout=timeout)
acquired = await lock.acquire()
return lock if acquired else None
async def close(self):
if self.redis:
await self.redis.close()性能优化
uvloop 与避免阻塞
# 1. uvloop — 高性能事件循环(2-4x 比 asyncio 默认快)
# pip install uvloop
import uvloop
# 设置 uvloop 为事件循环
uvloop.install()
# 或在 asyncio.run 之前
# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 2. 避免在协程中使用阻塞调用
async def bad_example():
# ❌ 阻塞整个事件循环
import time
time.sleep(5) # 阻塞所有协程
import requests
requests.get("https://api.example.com") # 阻塞
async def good_example():
# ✅ 使用异步替代
await asyncio.sleep(5)
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com") as resp:
return await resp.json()
# ✅ 必须使用阻塞库时,放到线程池
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_function, args)
# 3. 批量操作优化
async def batch_process(items: list, batch_size: int = 100):
"""分批处理大量数据"""
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
batch_results = await asyncio.gather(*[process(item) for item in batch])
results.extend(batch_results)
return results异步数据库操作
SQLAlchemy 2.0 异步
# pip install sqlalchemy[asyncio] asyncpg
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker
)
from sqlalchemy import select, update, delete
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
# 异步引擎
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/mydb",
echo=False,
pool_size=20,
max_overflow=10,
pool_timeout=30,
pool_recycle=1800
)
# 异步会话工厂
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# 模型定义
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(unique=True)
email: Mapped[str]
is_active: Mapped[bool] = mapped_column(default=True)
# 异步 CRUD 操作
class UserRepository:
async def create_user(self, username: str, email: str) -> User:
async with async_session() as session:
async with session.begin():
user = User(username=username, email=email)
session.add(user)
await session.flush()
return user
async def get_user(self, user_id: int) -> User | None:
async with async_session() as session:
result = await session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def list_active_users(self, page: int = 1, size: int = 20) -> list[User]:
async with async_session() as session:
offset = (page - 1) * size
result = await session.execute(
select(User)
.where(User.is_active == True)
.order_by(User.id)
.offset(offset)
.limit(size)
)
return list(result.scalars().all())
async def update_email(self, user_id: int, new_email: str) -> bool:
async with async_session() as session:
async with session.begin():
result = await session.execute(
update(User)
.where(User.id == user_id)
.values(email=new_email)
)
return result.rowcount > 0
async def bulk_insert(self, users: list[dict]) -> None:
"""批量插入(高性能)"""
async with async_session() as session:
async with session.begin():
session.add_all([User(**u) for u in users])异步 FastAPI 集成
FastAPI 异步最佳实践
# pip install fastapi uvicorn
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
# 应用生命周期管理
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时初始化资源
app.state.redis = AsyncRedisClient("redis://localhost:6379")
await app.state.redis.connect()
app.state.http_client = AsyncHttpClient("https://api.example.com")
yield # 应用运行中
# 关闭时清理资源
await app.state.redis.close()
await app.state.http_client.close()
app = FastAPI(lifespan=lifespan)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["https://myapp.com"],
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
# 异步依赖注入
async def get_redis(request: Request) -> AsyncRedisClient:
return request.app.state.redis
# 异步路由
@app.get("/api/users/{user_id}")
async def get_user(user_id: int, redis: AsyncRedisClient = Depends(get_redis)):
# 先查缓存
cached = await redis.cache_get(f"user:{user_id}")
if cached:
return cached
# 缓存未命中,查数据库
user = await user_repo.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 写入缓存
await redis.cache_set(f"user:{user_id}", user.model_dump(), ttl=300)
return user
# 并发调用多个外部服务
@app.get("/api/dashboard/{user_id}")
async def get_dashboard(user_id: int):
# 并发请求多个服务
user_task = asyncio.create_task(user_repo.get_user(user_id))
orders_task = asyncio.create_task(order_repo.get_recent_orders(user_id))
stats_task = asyncio.create_task(stats_repo.get_user_stats(user_id))
# 等待所有结果
user, orders, stats = await asyncio.gather(
user_task, orders_task, stats_task
)
return {
"user": user,
"recent_orders": orders,
"statistics": stats
}
# 后台任务
from fastapi import BackgroundTasks
async def send_notification(user_id: int, message: str):
"""后台发送通知"""
await asyncio.sleep(1) # 模拟耗时操作
print(f"通知已发送: {user_id} - {message}")
@app.post("/api/orders")
async def create_order(
request: CreateOrderRequest,
background_tasks: BackgroundTasks
):
order = await order_repo.create(request)
# 后台任务(不阻塞响应)
background_tasks.add_task(send_notification, order.user_id, "订单已创建")
return order异步测试
pytest-asyncio 测试异步代码
# pip install pytest pytest-asyncio
import pytest
import asyncio
# pytest.ini 配置
# [pytest]
# asyncio_mode = auto
@pytest.fixture
async def db_session():
"""异步数据库会话 fixture"""
engine = create_async_engine("sqlite+aiosqlite:///test.db")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with async_session() as session:
yield session
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
@pytest.mark.asyncio
async def test_create_user(db_session: AsyncSession):
"""测试异步创建用户"""
user = User(username="testuser", email="test@example.com")
db_session.add(user)
await db_session.commit()
result = await db_session.execute(
select(User).where(User.username == "testuser")
)
found = result.scalar_one()
assert found.email == "test@example.com"
@pytest.mark.asyncio
async def test_concurrent_requests():
"""测试并发请求"""
async with AsyncHttpClient("https://api.example.com") as client:
urls = [f"/items/{i}" for i in range(10)]
tasks = [client.get(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
success_count = sum(1 for r in results if not isinstance(r, Exception))
assert success_count >= 8 # 允许少量失败
@pytest.mark.asyncio
async def test_timeout():
"""测试超时"""
with pytest.raises(TimeoutError):
async with asyncio.timeout(0.1):
await asyncio.sleep(1) # 会超时
@pytest.mark.asyncio
async def test_cancellation():
"""测试任务取消"""
task = asyncio.create_task(long_running_operation())
await asyncio.sleep(0.1)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task优点
缺点
总结
Python asyncio 通过事件循环和协程实现高并发 IO 处理。TaskGroup(3.11+)提供结构化并发,Semaphore 控制并发数。异步 IO 库包括 aiohttp(HTTP)、aioredis(Redis)和 SQLAlchemy 2.0(数据库)。避免在协程中使用阻塞调用,必须使用时通过 run_in_executor 放到线程池。uvloop 提供 2-4x 的事件循环性能提升。生产者-消费者模式使用 asyncio.Queue 实现背压控制。
关键知识点
- 先区分这篇内容属于语法能力、工程能力,还是生态工具能力。
- Python 的开发效率来自生态,但可维护性来自结构、测试和规范。
- 脚本一旦进入长期维护,就必须按项目来治理。
- 框架与语言特性类主题要同时理解运行方式和工程组织方式。
项目落地视角
- 统一虚拟环境、依赖锁定、格式化和日志方案。
- 把入口、配置、业务逻辑和工具函数拆开,避免单文件膨胀。
- 对网络请求、文件读写和数据处理结果做异常与样本校验。
- 明确项目入口、配置管理、依赖管理、日志和测试策略。
常见误区
- 把临时脚本直接当生产代码使用。
- 忽略依赖版本、编码、路径和时区差异。
- 只会写 happy path,没有补超时、重试和资源释放。
- 把 notebook 或脚本风格直接带入长期维护项目。
进阶路线
- 把类型注解、测试、打包和部署纳入统一工程流程。
- 继续向异步、性能、数据管线和框架源码层深入。
- 把常用脚本抽成可复用库或 CLI 工具,而不是复制粘贴。
- 继续补齐部署、打包、监控和性能调优能力。
适用场景
- 当你准备把《Python 异步编程深入》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合脚本自动化、数据处理、Web 开发和测试工具建设。
- 当需求强调快速迭代和丰富生态时,Python 往往能快速起步。
落地建议
- 统一使用虚拟环境与依赖锁定,避免环境漂移。
- 对核心函数补类型注解、异常处理和日志,减少“脚本黑盒”。
- 一旦脚本进入生产链路,及时补测试和监控。
排错清单
- 先确认当前解释器、虚拟环境和依赖版本是否正确。
- 检查编码、路径、时区和第三方库行为差异。
- 排查同步阻塞、数据库连接未释放或网络请求无超时。
复盘问题
- 如果把《Python 异步编程深入》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《Python 异步编程深入》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《Python 异步编程深入》最大的收益和代价分别是什么?
