Python gRPC 服务
大约 10 分钟约 3030 字
Python gRPC 服务
简介
gRPC 是 Google 开源的高性能 RPC 框架,基于 Protocol Buffers 序列化和 HTTP/2 传输协议,支持双向流、连接复用和强类型接口定义。Python 中通过 grpcio 库实现 gRPC 服务的定义、实现和调用,适合微服务间的高效通信。
特点
实现
Protobuf 定义与代码生成
// proto/order.proto
syntax = "proto3";
package order;
// 订单服务定义
service OrderService {
// 普通 RPC
rpc GetOrder(OrderRequest) returns (OrderResponse);
rpc CreateOrder(CreateOrderRequest) returns (OrderResponse);
// 服务端流:查询订单历史
rpc ListOrders(ListOrdersRequest) returns (stream OrderResponse);
// 客户端流:批量导入订单
rpc ImportOrders(stream CreateOrderRequest) returns (ImportResult);
// 双向流:实时订单状态更新
rpc StreamOrderUpdates(stream OrderSubscription) returns (stream OrderUpdate);
}
message OrderRequest {
string order_id = 1;
}
message CreateOrderRequest {
string user_id = 1;
repeated OrderItem items = 2;
string shipping_address = 3;
}
message OrderItem {
string product_id = 1;
int32 quantity = 2;
double price = 3;
}
message OrderResponse {
string order_id = 1;
string user_id = 2;
string status = 3;
repeated OrderItem items = 4;
double total_amount = 5;
string created_at = 6;
}
message ListOrdersRequest {
string user_id = 1;
int32 page_size = 2;
string page_token = 3;
}
message ImportResult {
int32 total = 1;
int32 success = 2;
int32 failed = 3;
repeated string errors = 4;
}
message OrderSubscription {
string order_id = 1;
bool subscribe = 2;
}
message OrderUpdate {
string order_id = 1;
string status = 2;
string message = 3;
string updated_at = 4;
}# 生成 Python 代码
python -m grpc_tools.protoc \
-I./proto \
--python_out=./src/generated \
--grpc_python_out=./src/generated \
./proto/order.proto服务端实现
# server.py
import grpc
from concurrent import futures
import logging
import time
import uuid
from datetime import datetime
import order_pb2
import order_pb2_grpc
logger = logging.getLogger(__name__)
class OrderServicer(order_pb2_grpc.OrderServiceServicer):
"""订单服务实现"""
def __init__(self):
self.orders = {} # 模拟数据库
def GetOrder(self, request, context):
"""获取订单详情"""
order_id = request.order_id
order = self.orders.get(order_id)
if not order:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"订单不存在: {order_id}")
return order_pb2.OrderResponse()
return order
def CreateOrder(self, request, context):
"""创建订单"""
order_id = f"ORD-{uuid.uuid4().hex[:8].upper()}"
total = sum(item.quantity * item.price for item in request.items)
order = order_pb2.OrderResponse(
order_id=order_id,
user_id=request.user_id,
status="created",
items=request.items,
total_amount=total,
created_at=datetime.now().isoformat(),
)
self.orders[order_id] = order
logger.info(f"创建订单: {order_id}, 用户: {request.user_id}, 金额: {total}")
return order
def ListOrders(self, request, context):
"""服务端流:列出用户的所有订单"""
user_orders = [
o for o in self.orders.values()
if o.user_id == request.user_id
]
for order in user_orders[:request.page_size or 20]:
yield order
time.sleep(0.05) # 模拟逐步返回
def ImportOrders(self, request_iterator, context):
"""客户端流:批量导入订单"""
result = order_pb2.ImportResult()
for request in request_iterator:
result.total += 1
try:
if not request.user_id:
raise ValueError("缺少 user_id")
order_id = f"ORD-{uuid.uuid4().hex[:8].upper()}"
order = order_pb2.OrderResponse(
order_id=order_id,
user_id=request.user_id,
status="imported",
items=request.items,
total_amount=sum(i.quantity * i.price for i in request.items),
created_at=datetime.now().isoformat(),
)
self.orders[order_id] = order
result.success += 1
except Exception as e:
result.failed += 1
result.errors.append(str(e))
return result
def StreamOrderUpdates(self, request_iterator, context):
"""双向流:实时订单状态更新"""
for subscription in request_iterator:
if subscription.subscribe:
yield order_pb2.OrderUpdate(
order_id=subscription.order_id,
status="processing",
message="订单正在处理中",
updated_at=datetime.now().isoformat(),
)
time.sleep(1)
yield order_pb2.OrderUpdate(
order_id=subscription.order_id,
status="shipped",
message="订单已发货",
updated_at=datetime.now().isoformat(),
)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
order_pb2_grpc.add_OrderServiceServicer_to_server(OrderServicer(), server)
server.add_insecure_port("[::]:50051")
logger.info("gRPC 服务启动,端口: 50051")
server.start()
server.wait_for_termination()
# if __name__ == "__main__":
# serve()客户端调用
# client.py
import grpc
import order_pb2
import order_pb2_grpc
def run():
channel = grpc.insecure_channel("localhost:50051")
stub = order_pb2_grpc.OrderServiceStub(channel)
# 1. 创建订单
order = stub.CreateOrder(order_pb2.CreateOrderRequest(
user_id="U-001",
items=[
order_pb2.OrderItem(product_id="P-001", quantity=2, price=49.9),
order_pb2.OrderItem(product_id="P-002", quantity=1, price=199.0),
],
shipping_address="北京市海淀区",
))
print(f"创建订单: {order.order_id}, 金额: {order.total_amount}")
# 2. 查询订单
result = stub.GetOrder(order_pb2.OrderRequest(order_id=order.order_id))
print(f"查询订单: {result.order_id}, 状态: {result.status}")
# 3. 服务端流:列出订单
print("\n--- 订单列表 ---")
for o in stub.ListOrders(order_pb2.ListOrdersRequest(user_id="U-001", page_size=10)):
print(f" {o.order_id}: {o.status}, {o.total_amount}")
# 4. 客户端流:批量导入
def generate_orders():
for i in range(3):
yield order_pb2.CreateOrderRequest(
user_id=f"U-{i:03d}",
items=[order_pb2.OrderItem(product_id="P-001", quantity=1, price=10.0)],
)
import_result = stub.ImportOrders(generate_orders())
print(f"\n导入结果: 总计 {import_result.total}, 成功 {import_result.success}")
channel.close()
# run()拦截器与健康检查
# interceptors.py
import grpc
import time
import logging
logger = logging.getLogger(__name__)
class LoggingInterceptor(grpc.ServerInterceptor):
"""请求日志拦截器"""
def intercept_service(self, continuation, handler_call_details):
method = handler_call_details.method
start = time.time()
logger.info(f"gRPC 请求开始: {method}")
handler = continuation(handler_call_details)
def wrapper(request, context):
try:
response = handler(request, context)
elapsed = time.time() - start
logger.info(f"gRPC 请求完成: {method}, 耗时: {elapsed:.3f}s")
return response
except Exception as e:
elapsed = time.time() - start
logger.error(f"gRPC 请求失败: {method}, 耗时: {elapsed:.3f}s, 错误: {e}")
raise
if handler and handler.unary_unary:
return grpc.unary_unary_rpc_method_handler(wrapper)
return handler
class AuthInterceptor(grpc.ServerInterceptor):
"""认证拦截器"""
VALID_TOKEN = "secret-token"
def intercept_service(self, continuation, handler_call_details):
metadata = dict(handler_call_details.invocation_metadata)
token = metadata.get("authorization", "")
if token != f"Bearer {self.VALID_TOKEN}":
return grpc.unary_unary_rpc_method_handler(
lambda req, ctx: self._abort(ctx)
)
return continuation(handler_call_details)
def _abort(self, context):
context.abort(grpc.StatusCode.UNAUTHENTICATED, "认证失败")
# 在服务端使用拦截器
# server = grpc.server(
# futures.ThreadPoolExecutor(max_workers=10),
# interceptors=[LoggingInterceptor(), AuthInterceptor()],
# )优点
缺点
总结
gRPC 是微服务间高性能通信的最佳选择,通过 Protobuf 接口定义确保类型安全和跨语言互操作。四种流模式覆盖了同步调用、实时推送和批量传输等场景。但 gRPC 不适合浏览器直接调用,需要通过 gRPC-Web 或 REST 网关桥接。
关键知识点
- .proto 文件定义服务接口和消息格式,protoc 工具自动生成代码
- 四种通信模式:Unary(一元)、Server Streaming、Client Streaming、Bidirectional Streaming
- gRPC 状态码:OK、NOT_FOUND、INVALID_ARGUMENT、UNAUTHENTICATED 等
- 拦截器类似中间件,用于认证、日志、指标采集等横切逻辑
项目落地视角
- 微服务内部通信使用 gRPC,对外 API 使用 REST/gRPC-Web 网关
- .proto 文件独立版本管理,作为服务契约在团队间共享
- 配置连接池、超时、重试和负载均衡策略
- 使用 grpcurl 和 grpcui 进行开发和调试
常见误区
- 用 gRPC 替代所有 HTTP 通信,包括面向浏览器的前端接口
- 忽略 Protobuf 向后兼容性规则,随意修改字段编号
- 不设置超时和重试,导致请求无限等待
- 大消息使用 gRPC 而非分块传输,超出默认消息大小限制
进阶路线
- 学习 gRPC-Web 和 Envoy 网关实现浏览器端调用
- 研究 gRPC 的健康检查和负载均衡(客户端侧/服务端侧)
- 了解 Protobuf 的高级特性(Any、Oneof、Map)
- 探索 Connect 协议作为 gRPC 的现代替代方案
适用场景
- 微服务间的高性能内部通信
- 需要实时流式数据传输的场景
- 多语言微服务架构需要统一的接口定义
落地建议
- .proto 文件放在独立仓库或目录,通过 CI 自动生成和发布 SDK
- 为所有 gRPC 方法配置超时(deadline),避免级联阻塞
- 使用拦截器统一处理认证、日志和指标
排错清单
- 检查 .proto 文件的字段编号是否与生成的代码一致
- 确认服务端和客户端的 Protobuf 版本是否兼容
- 排查消息大小是否超过 gRPC 默认限制(4MB)
复盘问题
- 你的微服务间通信有多少已经迁移到 gRPC?迁移的收益如何衡量?
- .proto 文件的变更管理流程是什么?如何保证向后兼容?
- gRPC 服务的监控指标有哪些?延迟和错误率是否在可控范围内?
异步 gRPC 服务(asyncio)
# async_server.py — 基于 asyncio 的异步 gRPC 服务
import asyncio
import grpc
from concurrent import futures
import logging
import order_pb2
import order_pb2_grpc
logger = logging.getLogger(__name__)
class AsyncOrderServicer(order_pb2_grpc.OrderServiceServicer):
"""异步订单服务 — 使用 asyncio 提高并发性能"""
def __init__(self):
self.orders = {}
self._lock = asyncio.Lock()
async def GetOrder(self, request, context):
"""异步查询订单"""
await asyncio.sleep(0.01) # 模拟 IO 操作
order = self.orders.get(request.order_id)
if not order:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"订单不存在: {request.order_id}")
return order_pb2.OrderResponse()
return order
async def CreateOrder(self, request, context):
"""异步创建订单"""
import uuid
from datetime import datetime
order_id = f"ORD-{uuid.uuid4().hex[:8].upper()}"
total = sum(item.quantity * item.price for item in request.items)
async with self._lock:
order = order_pb2.OrderResponse(
order_id=order_id,
user_id=request.user_id,
status="created",
items=request.items,
total_amount=total,
created_at=datetime.now().isoformat(),
)
self.orders[order_id] = order
logger.info(f"创建订单: {order_id}")
return order
async def ListOrders(self, request, context):
"""服务端流 — 异步分批返回"""
user_orders = [
o for o in self.orders.values()
if o.user_id == request.user_id
]
for order in user_orders[:request.page_size or 20]:
await asyncio.sleep(0.01)
yield order
async def StreamOrderUpdates(self, request_iterator, context):
"""双向流 — 异步处理"""
try:
async for subscription in request_iterator:
if subscription.subscribe:
# 模拟异步处理
await asyncio.sleep(1)
yield order_pb2.OrderUpdate(
order_id=subscription.order_id,
status="processing",
message="订单正在处理中",
updated_at=datetime.now().isoformat(),
)
except asyncio.CancelledError:
logger.info("客户端断开连接")
context.set_code(grpc.StatusCode.CANCELLED)
async def serve():
"""启动异步 gRPC 服务"""
server = grpc.aio.server(
futures.ThreadPoolExecutor(max_workers=10),
options=[
('grpc.max_receive_message_length', 64 * 1024 * 1024), # 64MB
('grpc.max_send_message_length', 64 * 1024 * 1024),
('grpc.keepalive_time_ms', 30000),
('grpc.keepalive_timeout_ms', 10000),
]
)
order_pb2_grpc.add_OrderServiceServicer_to_server(
AsyncOrderServicer(), server
)
server.add_insecure_port("[::]:50051")
logger.info("异步 gRPC 服务启动,端口: 50051")
await server.start()
await server.wait_for_termination()
# asyncio.run(serve())gRPC 客户端连接管理
class GrpcClientPool:
"""gRPC 客户端连接池
解决问题:
- 频繁创建/销毁连接的开销
- 单连接无法支持高并发
- 连接断开后的自动重连
"""
def __init__(self, target: str, max_size: int = 10):
self.target = target
self.max_size = max_size
self._pool: asyncio.Queue = asyncio.Queue(maxsize=max_size)
self._created = 0
async def get_channel(self) -> grpc.aio.Channel:
"""获取连接"""
try:
channel = self._pool.get_nowait()
# 检查连接是否仍然有效
if await self._check_connection(channel):
return channel
except asyncio.QueueEmpty:
pass
# 创建新连接
if self._created < self.max_size:
self._created += 1
channel = grpc.aio.insecure_channel(
self.target,
options=[
('grpc.keepalive_time_ms', 30000),
('grpc.keepalive_timeout_ms', 10000),
('grpc.keepalive_permit_without_calls', True),
]
)
return channel
# 等待空闲连接
return await self._pool.get()
async def return_channel(self, channel: grpc.aio.Channel):
"""归还连接"""
try:
self._pool.put_nowait(channel)
except asyncio.QueueFull:
await channel.close()
async def _check_connection(self, channel) -> bool:
"""检查连接状态"""
try:
grpc.aio.channel_ready_future(channel).result(timeout=1)
return True
except grpc.FutureTimeoutError:
return False
async def close_all(self):
"""关闭所有连接"""
while not self._pool.empty():
channel = await self._pool.get()
await channel.close()Protobuf 版本兼容与演进
def explain_protobuf_compatibility():
"""Protobuf 版本兼容规则
安全变更(不破坏兼容性):
1. 添加新字段(使用新的字段编号)
2. 删除旧字段(字段编号不再使用)
3. 将 required 改为 optional(proto3 中默认 optional)
4. 添加新服务方法
5. 添加新枚举值(前提:客户端能忽略未知值)
不安全变更(破坏兼容性):
1. 修改字段编号(已有编号不能变)
2. 修改字段类型(int32 → int64 可能不兼容)
3. 修改字段名称(JSON 序列化会受影响)
4. 将 optional 改为 required
5. 删除枚举值(客户端可能仍使用旧值)
最佳实践:
1. 永远不要修改已分配的字段编号
2. 使用 reserved 标记已删除的字段编号
3. 新字段使用 optional 明确语义
4. 删除字段时添加注释说明删除原因和时间
5. 在 CI 中自动检查 proto 文件的兼容性
使用 buf (https://buf.build) 做兼容性检查
"""
# 示例:安全的 proto 演进
print("""
// v1
message User {
string name = 1;
int32 age = 2;
}
// v2 — 安全添加新字段
message User {
string name = 1;
int32 age = 2;
string email = 3; // 新增字段
repeated string tags = 4; // 新增字段
reserved 5; // 预留编号
}
// v3 — 安全删除字段
message User {
string name = 1;
// int32 age = 2; // 已删除
string email = 3;
repeated string tags = 4;
reserved 2, 5; // 标记已使用的编号
}
""")
explain_protobuf_compatibility()gRPC 错误处理与超时
import grpc
from grpc import StatusCode
# 自定义 gRPC 错误
class OrderError(Exception):
def __init__(self, code: StatusCode, message: str, details: dict = None):
self.code = code
self.message = message
self.details = details or {}
def handle_grpc_error(error: grpc.RpcError):
"""统一处理 gRPC 错误"""
error_map = {
StatusCode.NOT_FOUND: (404, "资源不存在"),
StatusCode.INVALID_ARGUMENT: (400, "参数错误"),
StatusCode.DEADLINE_EXCEEDED: (504, "请求超时"),
StatusCode.UNAUTHENTICATED: (401, "未认证"),
StatusCode.PERMISSION_DENIED: (403, "权限不足"),
StatusCode.INTERNAL: (500, "内部错误"),
StatusCode.UNAVAILABLE: (503, "服务不可用"),
StatusCode.RESOURCE_EXHAUSTED: (429, "请求过多"),
}
http_code, http_message = error_map.get(
error.code(), (500, "未知错误")
)
return http_code, f"{http_message}: {error.details()}"
# 带超时的客户端调用
async def call_with_timeout(stub, timeout_seconds: float = 5.0):
"""设置调用超时"""
try:
response = await stub.GetOrder(
order_pb2.OrderRequest(order_id="ORD-001"),
timeout=timeout_seconds
)
return response
except grpc.RpcError as e:
if e.code() == StatusCode.DEADLINE_EXCEEDED:
print(f"调用超时({timeout_seconds}s)")
raise
# 重试策略
async def call_with_retry(stub, max_retries: int = 3, base_delay: float = 1.0):
"""指数退避重试"""
for attempt in range(max_retries):
try:
return await stub.GetOrder(
order_pb2.OrderRequest(order_id="ORD-001")
)
except grpc.RpcError as e:
if e.code() in (StatusCode.UNAVAILABLE, StatusCode.DEADLINE_EXCEEDED):
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
continue
raise