Python 微服务架构
大约 12 分钟约 3569 字
Python 微服务架构
简介
Python 微服务架构是将单体应用拆分为一组小型、独立部署的服务,每个服务围绕特定业务能力构建,通过轻量级协议(HTTP/gRPC)通信。Python 凭借 FastAPI 的高性能异步框架、丰富的生态和快速开发能力,成为微服务开发的热门选择。
微服务架构解决的核心问题是复杂度管理和独立演进。当单体应用的代码量超过百万行、团队超过 10 人时,开发效率会急剧下降。微服务将系统拆分为可以独立开发、测试、部署的小单元,每个团队负责自己的服务,大幅提升并行开发效率。
但微服务也引入了分布式系统的复杂性:服务发现、负载均衡、分布式追踪、熔断降级、配置管理、数据一致性等。这些问题需要系统化的解决方案,否则微服务的运维成本会远超单体应用。
特点
FastAPI 微服务模板
项目结构
user-service/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI 应用入口
│ ├── config.py # 配置管理
│ ├── dependencies.py # 依赖注入
│ ├── models/
│ │ ├── __init__.py
│ │ ├── user.py # 数据模型
│ │ └── schemas.py # Pydantic 模型
│ ├── api/
│ │ ├── __init__.py
│ │ ├── router.py # 路由注册
│ │ └── v1/
│ │ ├── __init__.py
│ │ └── users.py # 用户接口
│ ├── services/
│ │ ├── __init__.py
│ │ └── user_service.py # 业务逻辑
│ ├── repositories/
│ │ ├── __init__.py
│ │ └── user_repo.py # 数据访问
│ └── middleware/
│ ├── __init__.py
│ ├── logging.py # 日志中间件
│ └── tracing.py # 链路追踪
├── tests/
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
└── pyproject.tomlFastAPI 应用入口
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from app.config import Settings
from app.api.router import api_router
from app.middleware.logging import LoggingMiddleware
from app.middleware.tracing import setup_tracing
settings = Settings()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
# 启动
print(f"服务启动: {settings.SERVICE_NAME} v{settings.VERSION}")
print(f"环境: {settings.ENVIRONMENT}")
# 初始化数据库连接池等
# await db.connect()
yield
# 关闭
print("服务关闭")
# await db.disconnect()
app = FastAPI(
title=settings.SERVICE_NAME,
version=settings.VERSION,
description="用户服务 API",
lifespan=lifespan,
)
# 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(LoggingMiddleware)
# 链路追踪
setup_tracing(app, settings.JAEGER_ENDPOINT)
# 路由
app.include_router(api_router, prefix="/api/v1")
# 健康检查
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": settings.SERVICE_NAME}
@app.get("/ready")
async def readiness_check():
"""就绪检查 — 检查依赖是否可用"""
checks = {
"database": True, # await db.ping()
"redis": True, # await redis.ping()
}
all_healthy = all(checks.values())
return {"ready": all_healthy, "checks": checks}配置管理
# app/config.py
from pydantic_settings import BaseSettings
from typing import List
class Settings(BaseSettings):
"""服务配置 — 从环境变量读取"""
# 基础配置
SERVICE_NAME: str = "user-service"
VERSION: str = "1.0.0"
ENVIRONMENT: str = "development"
DEBUG: bool = False
# 服务端口
PORT: int = 8000
# 数据库
DATABASE_URL: str = "postgresql+asyncpg://user:pass@localhost/users"
DB_POOL_SIZE: int = 20
DB_MAX_OVERFLOW: int = 10
# Redis
REDIS_URL: str = "redis://localhost:6379/0"
# 认证
JWT_SECRET: str = "change-me-in-production"
JWT_ALGORITHM: str = "HS256"
JWT_EXPIRATION_MINUTES: int = 60
# CORS
CORS_ORIGINS: List[str] = ["*"]
# 链路追踪
JAEGER_ENDPOINT: str = "http://localhost:14268/api/traces"
# 服务注册
CONSUL_HOST: str = "localhost"
CONSUL_PORT: int = 8500
class Config:
env_file = ".env"
env_prefix = "USER_SERVICE_" # 环境变量前缀
settings = Settings()Pydantic 数据模型
# app/models/schemas.py
from pydantic import BaseModel, EmailStr, Field
from datetime import datetime
from typing import Optional
from enum import Enum
class UserRole(str, Enum):
ADMIN = "admin"
USER = "user"
MODERATOR = "moderator"
class UserCreate(BaseModel):
"""创建用户请求"""
username: str = Field(min_length=3, max_length=50, pattern=r'^[a-zA-Z0-9_]+$')
email: EmailStr
password: str = Field(min_length=8, max_length=128)
role: UserRole = UserRole.USER
class UserUpdate(BaseModel):
"""更新用户请求"""
email: Optional[EmailStr] = None
role: Optional[UserRole] = None
class UserResponse(BaseModel):
"""用户响应"""
id: int
username: str
email: str
role: UserRole
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class UserListResponse(BaseModel):
"""用户列表响应"""
users: list[UserResponse]
total: int
page: int
page_size: int
class APIResponse(BaseModel):
"""统一响应格式"""
code: int = 0
message: str = "success"
data: Optional[dict] = None服务间通信 (HTTP)
# app/services/http_client.py
import httpx
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class ServiceClient:
"""服务间 HTTP 客户端"""
def __init__(self, base_url: str, timeout: float = 5.0):
self.base_url = base_url
self.timeout = timeout
self._client: Optional[httpx.AsyncClient] = None
async def get_client(self) -> httpx.AsyncClient:
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
base_url=self.base_url,
timeout=self.timeout,
headers={"Content-Type": "application/json"},
)
return self._client
async def get(self, path: str, params: dict = None) -> dict:
client = await self.get_client()
try:
response = await client.get(path, params=params)
response.raise_for_status()
return response.json()
except httpx.TimeoutException:
logger.error(f"请求超时: GET {self.base_url}{path}")
raise
except httpx.HTTPStatusError as e:
logger.error(f"HTTP 错误: {e.response.status_code}")
raise
async def post(self, path: str, data: dict = None) -> dict:
client = await self.get_client()
try:
response = await client.post(path, json=data)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"POST 错误: {e.response.status_code} - {e.response.text}")
raise
async def close(self):
if self._client and not self._client.is_closed:
await self._client.close()
# 服务注册表
class ServiceRegistry:
"""服务地址注册表"""
SERVICES = {
"user-service": "http://user-service:8000",
"order-service": "http://order-service:8001",
"product-service": "http://product-service:8002",
"notification-service": "http://notification-service:8003",
}
@classmethod
def get_client(cls, service_name: str) -> ServiceClient:
base_url = cls.SERVICES.get(service_name)
if not base_url:
raise ValueError(f"未知服务: {service_name}")
return ServiceClient(base_url)gRPC 服务间通信
# gRPC 服务定义: proto/user_service.proto
"""
syntax = "proto3";
package user;
service UserService {
rpc GetUser (GetUserRequest) returns (UserResponse);
rpc ListUsers (ListUsersRequest) returns (UserListResponse);
rpc CreateUser (CreateUserRequest) returns (UserResponse);
}
message GetUserRequest {
int32 id = 1;
}
message CreateUserRequest {
string username = 1;
string email = 2;
string password = 3;
}
message UserResponse {
int32 id = 1;
string username = 2;
string email = 3;
string role = 4;
int64 created_at = 5;
}
message ListUsersRequest {
int32 page = 1;
int32 page_size = 2;
}
message UserListResponse {
repeated UserResponse users = 1;
int32 total = 2;
}
"""
# gRPC 服务端实现
"""
import grpc
from concurrent import futures
import user_service_pb2
import user_service_pb2_grpc
class UserServicer(user_service_pb2_grpc.UserServiceServicer):
def __init__(self, user_service):
self.user_service = user_service
async def GetUser(self, request, context):
user = await self.user_service.get_by_id(request.id)
if not user:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details('用户不存在')
return user_service_pb2.UserResponse()
return user_service_pb2.UserResponse(
id=user.id,
username=user.username,
email=user.email,
)
async def CreateUser(self, request, context):
user = await self.user_service.create(
username=request.username,
email=request.email,
password=request.password,
)
return user_service_pb2.UserResponse(
id=user.id,
username=user.username,
)
# 启动 gRPC 服务器
async def serve():
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
user_service_pb2_grpc.add_UserServiceServicer_to_server(
UserServicer(user_svc), server
)
server.add_insecure_port('[::]:50051')
await server.start()
await server.wait_for_termination()
"""分布式追踪
# app/middleware/tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
import logging
logger = logging.getLogger(__name__)
def setup_tracing(app, jaeger_endpoint: str):
"""配置分布式追踪"""
# Jaeger 导出器
jaeger_exporter = JaegerExporter(
agent_host_name=jaeger_endpoint.split(":")[0] if ":" in jaeger_endpoint else "localhost",
agent_port=6831,
)
# 配置 Tracer
provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
trace.set_tracer_provider(provider)
# 自动埋点 FastAPI
FastAPIInstrumentor.instrument_app(app)
# 自动埋点 httpx
HTTPXClientInstrumentor().instrument()
logger.info(f"链路追踪已配置: {jaeger_endpoint}")
# 使用示例:手动添加 Span
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
async def process_order(order_id: int):
"""带追踪的业务方法"""
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
# 子操作
with tracer.start_as_current_span("validate_order"):
# 验证订单
pass
with tracer.start_as_current_span("deduct_inventory"):
# 扣减库存
pass
with tracer.start_as_current_span("create_payment"):
# 创建支付
pass
span.set_attribute("order.status", "completed")熔断器
# pip install circuitbreaker
import time
import logging
from functools import wraps
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@dataclass
class CircuitState:
"""熔断器状态"""
failure_count: int = 0
last_failure_time: float = 0
state: str = "closed" # closed, open, half_open
success_count: int = 0
class CircuitBreaker:
"""熔断器实现
状态转换:
Closed -> (失败次数超阈值) -> Open
Open -> (超时后) -> Half-Open
Half-Open -> (成功) -> Closed
Half-Open -> (失败) -> Open
"""
def __init__(
self,
name: str = "default",
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
half_open_max_calls: int = 3,
):
self.name = name
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.state = CircuitState()
def __call__(self, func):
@wraps(func)
async def wrapper(*args, **kwargs):
return await self._call(func, *args, **kwargs)
return wrapper
async def _call(self, func, *args, **kwargs):
if self.state.state == "open":
if time.time() - self.state.last_failure_time > self.recovery_timeout:
self.state.state = "half_open"
logger.info(f"熔断器 [{self.name}] 转为半开状态")
else:
raise CircuitBreakerOpenError(
f"熔断器 [{self.name}] 处于开启状态,拒绝请求"
)
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
if self.state.state == "half_open":
self.state.success_count += 1
if self.state.success_count >= self.half_open_max_calls:
self.state.state = "closed"
self.state.failure_count = 0
self.state.success_count = 0
logger.info(f"熔断器 [{self.name}] 恢复为关闭状态")
def _on_failure(self):
self.state.failure_count += 1
self.state.last_failure_time = time.time()
if self.state.state == "half_open":
self.state.state = "open"
logger.warning(f"熔断器 [{self.name}] 半开状态失败,重新打开")
elif self.state.failure_count >= self.failure_threshold:
self.state.state = "open"
logger.warning(
f"熔断器 [{self.name}] 达到失败阈值 {self.failure_threshold},打开熔断"
)
class CircuitBreakerOpenError(Exception):
pass
# 使用示例
breaker = CircuitBreaker(name="order-service", failure_threshold=3, recovery_timeout=30)
@breaker
async def call_order_service(order_id: int):
"""带熔断的服务调用"""
client = ServiceRegistry.get_client("order-service")
return await client.get(f"/api/v1/orders/{order_id}")API 网关配置
# API 网关: 使用 FastAPI 实现简单的网关
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
import httpx
import logging
logger = logging.getLogger(__name__)
app = FastAPI(title="API Gateway")
# 路由表
ROUTE_TABLE = {
"/api/v1/users": {"service": "user-service", "rewrite": "/api/v1/users"},
"/api/v1/orders": {"service": "order-service", "rewrite": "/api/v1/orders"},
"/api/v1/products": {"service": "product-service", "rewrite": "/api/v1/products"},
}
SERVICE_URLS = {
"user-service": "http://localhost:8000",
"order-service": "http://localhost:8001",
"product-service": "http://localhost:8002",
}
# 认证中间件
async def verify_token(request: Request):
token = request.headers.get("Authorization", "").replace("Bearer ", "")
if not token:
raise HTTPException(status_code=401, detail="Missing token")
# JWT 验证逻辑
return token
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def gateway_proxy(request: Request, path: str):
"""请求代理"""
# 找到对应服务
target_route = None
for prefix, route in ROUTE_TABLE.items():
if f"/{path}".startswith(prefix):
target_route = route
break
if not target_route:
raise HTTPException(status_code=404, detail="Route not found")
# 认证
await verify_token(request)
# 转发请求
service_url = SERVICE_URLS[target_route["service"]]
target_url = f"{service_url}/{path}"
async with httpx.AsyncClient() as client:
response = await client.request(
method=request.method,
url=target_url,
headers=dict(request.headers),
content=await request.body(),
params=dict(request.query_params),
timeout=10.0,
)
return StreamingResponse(
iter([response.content]),
status_code=response.status_code,
headers=dict(response.headers),
)Docker 部署
# Dockerfile
# 多阶段构建
FROM python:3.12-slim AS builder
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 生产镜像
FROM python:3.12-slim
WORKDIR /app
# 安全: 非 root 用户
RUN groupadd -r appuser && useradd -r -g appuser appuser
# 复制依赖和代码
COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
COPY app/ ./app/
# 切换用户
USER appuser
EXPOSE 8000
# 健康检查
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD python -c "import httpx; httpx.get('http://localhost:8000/health')"
# 启动
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]# docker-compose.yml — 开发环境
version: '3.8'
services:
# 用户服务
user-service:
build:
context: ./user-service
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- USER_SERVICE_DATABASE_URL=postgresql+asyncpg://user:pass@postgres:5432/users
- USER_SERVICE_REDIS_URL=redis://redis:6379/0
- USER_SERVICE_JWT_SECRET=dev-secret-key
- USER_SERVICE_ENVIRONMENT=development
depends_on:
- postgres
- redis
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 5s
retries: 3
# 订单服务
order-service:
build:
context: ./order-service
dockerfile: Dockerfile
ports:
- "8001:8000"
environment:
- ORDER_SERVICE_DATABASE_URL=postgresql+asyncpg://user:pass@postgres:5432/orders
depends_on:
- postgres
- redis
# PostgreSQL
postgres:
image: postgres:16
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
# Redis
redis:
image: redis:7-alpine
ports:
- "6379:6379"
# Jaeger 追踪
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # UI
- "14268:14268" # Collector
# Consul 服务注册
consul:
image: consul:latest
ports:
- "8500:8500"
volumes:
postgres_data:K8s 部署
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
labels:
app: user-service
spec:
replicas: 3
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
spec:
containers:
- name: user-service
image: registry.company.com/user-service:1.0.0
ports:
- containerPort: 8000
env:
- name: USER_SERVICE_DATABASE_URL
valueFrom:
secretKeyRef:
name: user-service-secrets
key: database-url
- name: USER_SERVICE_JWT_SECRET
valueFrom:
secretKeyRef:
name: user-service-secrets
key: jwt-secret
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: user-service
spec:
selector:
app: user-service
ports:
- port: 80
targetPort: 8000
type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: user-service-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: user-service
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70优点
缺点
性能注意事项
- 服务间调用延迟:每次 HTTP/gRPC 调用增加 5-50ms,需控制调用链长度
- 连接池:httpx.AsyncClient 应复用,不要每次请求创建新连接
- 序列化开销:JSON 序列化在大数据量时较慢,gRPC 使用 protobuf 更高效
- 数据库连接:每个服务独立连接池,注意总连接数不要超过数据库上限
- 健康检查频率:K8s 默认每 10 秒健康检查,频率不要太高
- 日志量:每个服务都产生日志,需要集中收集和索引
总结
Python 微服务架构以 FastAPI 为核心框架,配合 Docker 容器化和 K8s 编排,提供了从开发到部署的完整方案。关键基础设施包括服务发现(Consul)、链路追踪(Jaeger)、熔断器、API 网关等。核心原则是保持服务小而独立、接口清晰、通信可靠、可观测性强。
关键知识点
- FastAPI 微服务模板 — 分层架构(路由-服务-仓库)
- 服务间通信 — HTTP (httpx) 和 gRPC 两种方式
- 分布式追踪 — OpenTelemetry + Jaeger 全链路追踪
- 熔断器 — 防止级联故障的三状态机制
- API 网关 — 统一入口、认证、路由
- Docker 多阶段构建 — 减小镜像体积
- K8s 部署 — Deployment + Service + HPA
- 配置管理 — 环境变量 + Pydantic Settings
常见误区
- 过度拆分:一个简单的 CRUD 应用拆成 10 个服务,运维成本远超收益
- 忽视可观测性:没有日志、追踪、监控就上线微服务,出问题无法排查
- 共享数据库:多个服务直接访问同一个数据库,耦合度高
- 同步调用链过长:A->B->C->D,任何一环出问题整条链失败
- 忽视服务发现:硬编码服务地址,扩容时需要手动更新配置
- 不设超时:服务调用没有超时,一个慢服务拖垮调用链
进阶路线
- 入门:FastAPI 单服务、Docker 部署
- 进阶:服务间通信、链路追踪、熔断器
- 高级:API 网关、K8s 部署、服务网格(Istio)
- 专家:事件驱动架构、Saga 分布式事务、混沌工程
适用场景
- 大型互联网应用(电商、社交、SaaS)
- 多团队协作的企业级应用
- 需要弹性扩展的高并发系统
- 不同模块技术栈差异较大的系统
- 需要独立部署和发版的业务模块
落地建议
- 第一步:用 FastAPI 搭建第一个微服务模板
- 第二步:配置 Docker 和 docker-compose 本地开发环境
- 第三步:实现服务间 HTTP 通信和健康检查
- 第四步:集成链路追踪和日志收集
- 第五步:添加熔断器和重试机制
- 持续:逐步拆分更多服务,建立 K8s 部署流水线
排错清单
复盘问题
- 当前有多少个微服务?哪些是热点服务?
- 服务间调用的 P99 延迟是多少?
- 上次服务故障是什么原因?是否有级联故障?
- CI/CD 流水线的部署频率和成功率是多少?
- 每个服务的资源使用率如何?是否有过度分配?
- 链路追踪覆盖率如何?是否有未追踪的调用?
