gRPC 流式通信与拦截器
大约 9 分钟约 2798 字
gRPC 流式通信与拦截器
简介
gRPC 是高性能的 RPC 框架,使用 Protocol Buffers 序列化和 HTTP/2 传输。gRPC 支持四种通信模式:一元、服务端流、客户端流和双向流。拦截器机制类似 ASP.NET Core 中间件,用于横切关注点处理。
特点
四种通信模式
一元调用(Unary)
// order.proto
syntax = "proto3";
package order;
service OrderService {
// 一元调用:请求-响应
rpc GetOrder (GetOrderRequest) returns (Order);
// 服务端流:一个请求,多个响应
rpc StreamOrders (StreamOrdersRequest) returns (stream OrderUpdate);
// 客户端流:多个请求,一个响应
rpc UploadOrderItems (stream OrderItem) returns (UploadSummary);
// 双向流:多个请求,多个响应
rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}
message GetOrderRequest { int32 order_id = 1; }
message Order {
int32 id = 1;
string customer = 2;
repeated OrderItem items = 3;
double total = 4;
}// 服务端实现
public class OrderServiceImpl : OrderService.OrderServiceBase
{
// 一元调用
public override async Task<Order> GetOrder(GetOrderRequest request, ServerCallContext context)
{
var order = await _orderRepo.GetAsync(request.OrderId);
if (order == null)
throw new RpcException(new Status(StatusCode.NotFound, $"订单 {request.OrderId} 不存在"));
return new Order
{
Id = order.Id,
Customer = order.CustomerName,
Total = order.TotalAmount
};
}
// 服务端流
public override async Task StreamOrders(StreamOrdersRequest request,
IServerStreamWriter<OrderUpdate> responseStream, ServerCallContext context)
{
await foreach (var update in _orderService.SubscribeUpdatesAsync(context.CancellationToken))
{
if (context.CancellationToken.IsCancellationRequested) break;
await responseStream.WriteAsync(new OrderUpdate
{
OrderId = update.OrderId,
Status = update.Status,
Message = update.Message
});
}
}
// 客户端流
public override async Task<UploadSummary> UploadOrderItems(
IAsyncStreamReader<OrderItem> requestStream, ServerCallContext context)
{
var items = new List<OrderItem>();
await foreach (var item in requestStream.ReadAllAsync(context.CancellationToken))
{
items.Add(item);
}
// 批量处理
await _orderService.ProcessItemsAsync(items);
return new UploadSummary
{
TotalItems = items.Count,
TotalAmount = items.Sum(i => i.Price * i.Quantity)
};
}
// 双向流
public override async Task Chat(
IAsyncStreamReader<ChatMessage> requestStream,
IServerStreamWriter<ChatMessage> responseStream,
ServerCallContext context)
{
// 同时处理读写
var readTask = Task.Run(async () =>
{
await foreach (var message in requestStream.ReadAllAsync(context.CancellationToken))
{
await _chatService.ProcessMessageAsync(message);
}
});
var writeTask = Task.Run(async () =>
{
await foreach (var message in _chatService.GetMessagesAsync(context.CancellationToken))
{
await responseStream.WriteAsync(message);
}
});
await Task.WhenAll(readTask, writeTask);
}
}拦截器
服务端拦截器
// 服务端拦截器 — 类似 ASP.NET Core 中间件
public class ServerLoggingInterceptor : Interceptor
{
private readonly ILogger _logger;
public ServerLoggingInterceptor(ILogger<ServerLoggingInterceptor> logger)
=> _logger = logger;
public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
TRequest request, ServerCallContext context,
UnaryServerMethod<TRequest, TResponse> continuation)
{
var sw = Stopwatch.StartNew();
try
{
_logger.LogInformation("gRPC 调用: {Method}", context.Method);
var response = await continuation(request, context);
sw.Stop();
_logger.LogInformation("gRPC 完成: {Method} ({Ms}ms)", context.Method, sw.ElapsedMilliseconds);
return response;
}
catch (Exception ex)
{
sw.Stop();
_logger.LogError(ex, "gRPC 错误: {Method} ({Ms}ms)", context.Method, sw.ElapsedMilliseconds);
throw;
}
}
}
// 认证拦截器
public class AuthInterceptor : Interceptor
{
public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
TRequest request, ServerCallContext context,
UnaryServerMethod<TRequest, TResponse> continuation)
{
var authHeader = context.RequestHeaders.GetValue("authorization");
if (string.IsNullOrEmpty(authHeader))
{
throw new RpcException(new Status(StatusCode.Unauthenticated, "缺少认证头"));
}
// 验证 Token
var token = authHeader.Replace("Bearer ", "");
if (!await ValidateTokenAsync(token))
{
throw new RpcException(new Status(StatusCode.Unauthenticated, "无效 Token"));
}
return await continuation(request, context);
}
}
// 注册拦截器
builder.Services.AddGrpc(options =>
{
options.Interceptors.Add<ServerLoggingInterceptor>();
options.Interceptors.Add<AuthInterceptor>();
options.EnableDetailedErrors = true;
options.MaxReceiveMessageSize = 4 * 1024 * 1024; // 4MB
options.MaxSendMessageSize = 4 * 1024 * 1024;
});客户端拦截器
// 客户端拦截器
public class ClientRetryInterceptor : Interceptor
{
private readonly ILogger _logger;
public override async Task<TResponse> UnaryHandler<TRequest, TResponse>(
TRequest request, ClientInterceptorContext<TRequest, TResponse> context,
UnaryCallContinuation<TRequest, TResponse> continuation)
{
var maxRetries = 3;
for (int i = 0; i < maxRetries; i++)
{
try
{
return await continuation(request, context);
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Unavailable && i < maxRetries - 1)
{
_logger.LogWarning("gRPC 重试 {Attempt}/{Max}: {Method}", i + 1, maxRetries, context.Method);
await Task.Delay(TimeSpan.FromMilliseconds(100 * (i + 1)));
}
}
throw new RpcException(new Status(StatusCode.Unavailable, "达到最大重试次数"));
}
}
// 创建客户端
var channel = GrpcChannel.ForAddress("https://localhost:5001", new GrpcChannelOptions
{
HttpHandler = new SocketsHttpHandler
{
PooledConnectionIdleTimeout = TimeSpan.FromMinutes(5),
KeepAlivePingDelay = TimeSpan.FromSeconds(60),
KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
EnableMultipleHttp2Connections = true
}
});
var client = new OrderService.OrderServiceClient(channel)
.Intercept(new ClientRetryInterceptor());健康检查
gRPC 健康检查协议
// NuGet: Grpc.AspNetCore.HealthChecks
// NuGet: Grpc.Net.Client
// 服务端配置健康检查
builder.Services.AddGrpcHealthChecks()
.AddCheck("order-service", () => HealthCheckResult.Healthy())
.AddCheck<DatabaseHealthCheck>("database");
var app = builder.Build();
app.MapGrpcHealthChecksService();
// 自定义健康检查
public class DatabaseHealthCheck : IHealthCheck
{
private readonly AppDbContext _db;
public DatabaseHealthCheck(AppDbContext db) => _db = db;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
try
{
await _db.Database.ExecuteSqlRawAsync("SELECT 1", ct);
return HealthCheckResult.Healthy("数据库连接正常");
}
catch (Exception ex)
{
return HealthCheckResult.Degraded($"数据库异常: {ex.Message}");
}
}
}
// 客户端健康检查
var healthClient = new Health.HealthClient(channel);
var response = await healthClient.CheckAsync(new HealthCheckRequest { Service = "order-service" });
if (response.Status != HealthCheckResponse.Types.ServingStatus.Serving)
{
Console.WriteLine("服务不可用");
}截止时间与取消
Deadline 传播
// 服务端 — 设置方法级截止时间
public class OrderServiceImpl : OrderService.OrderServiceBase
{
public override async Task<Order> GetOrder(
GetOrderRequest request, ServerCallContext context)
{
// 检查客户端传递的截止时间
var remaining = context.Deadline - DateTime.UtcNow;
if (remaining < TimeSpan.FromSeconds(1))
{
throw new RpcException(new Status(StatusCode.DeadlineExceeded,
"剩余时间不足"));
}
// 传递 CancellationToken 给下游
var order = await _orderRepo.GetAsync(
request.OrderId, context.CancellationToken);
return MapToProto(order);
}
}
// 客户端 — 设置截止时间
var client = new OrderService.OrderServiceClient(channel);
// 方式 1:通过 CallOptions 设置
var response = await client.GetOrderAsync(
new GetOrderRequest { OrderId = 1 },
deadline: DateTime.UtcNow.AddSeconds(5));
// 方式 2:通过 CancellationToken 取消
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
try
{
var response = await client.GetOrderAsync(
new GetOrderRequest { OrderId = 1 },
cancellationToken: cts.Token);
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
Console.WriteLine("请求被取消");
}
// 方式 3:使用 WithDeadline 扩展
var call = client.StreamOrders(
new StreamOrdersRequest(),
deadline: DateTime.UtcNow.AddSeconds(30));
await foreach (var update in call.ResponseStream.ReadAllAsync())
{
// 处理流式数据
}反射与元数据
Metadata 传递
// 客户端发送自定义 Metadata
var client = new OrderService.OrderServiceClient(channel);
var headers = new Metadata
{
{ "request-id", Guid.NewGuid().ToString() },
{ "source", "web-app" },
{ "trace-id", Activity.Current?.TraceId.ToString() ?? "" }
};
var call = client.GetOrderAsync(
new GetOrderRequest { OrderId = 1 },
headers);
var response = await call.ResponseAsync;
// 服务端读取 Metadata
public override async Task<Order> GetOrder(
GetOrderRequest request, ServerCallContext context)
{
var requestId = context.RequestHeaders.GetValue("request-id");
var source = context.RequestHeaders.GetValue("source");
_logger.LogInformation("请求 {RequestId} 来自 {Source}", requestId, source);
// 写入响应头
context.ResponseHeaders.Add("x-processing-time", "42ms");
context.WriteResponseTrailers(new Metadata
{
{ "trace-id", Activity.Current?.TraceId.ToString() ?? "" }
});
return await GetOrderInternal(request.OrderId);
}
// 客户端读取响应头
var call = client.GetOrderAsync(
new GetOrderRequest { OrderId = 1 },
headers);
var headers = await call.ResponseHeadersAsync;
var response = await call.ResponseAsync;
var trailers = call.GetTrailers();
var traceId = trailers.GetValue("trace-id");服务器反射与客户端负载均衡
服务发现
// 服务端启用反射(方便客户端动态发现服务)
// NuGet: Grpc.AspNetCore.Server.Reflection
builder.Services.AddGrpcReflection();
app.MapGrpcReflectionService();
// 客户端负载均衡
var channel = GrpcChannel.ForAddress("https://dns:///order-service:443",
new GrpcChannelOptions
{
HttpHandler = new SocketsHttpHandler
{
// 启用连接池和负载均衡
EnableMultipleHttp2Connections = true,
PooledConnectionIdleTimeout = TimeSpan.FromMinutes(5),
KeepAlivePingDelay = TimeSpan.FromSeconds(60),
KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
// 连接池大小
PooledConnectionLifetime = TimeSpan.FromMinutes(30)
},
// 服务配置(负载均衡策略)
ServiceConfig = new ServiceConfig
{
LoadBalancingConfigs = { new LoadBalancingConfig("round_robin") },
MethodConfigs =
{
new MethodConfig
{
Names = { MethodName.Default },
RetryPolicy = new RetryPolicy
{
MaxAttempts = 3,
InitialBackoff = TimeSpan.FromSeconds(1),
MaxBackoff = TimeSpan.FromSeconds(5),
BackoffMultiplier = 2,
RetryableStatusCodes = { StatusCode.Unavailable }
}
}
}
}
});Proto 版本演进
向后兼容策略
// 向后兼容的字段变更规则:
// 1. 可以添加新字段(使用新的字段编号)
// 2. 不能删除或重用字段编号
// 3. 不能修改字段类型
// 4. 可以将 required 改为 optional
// 5. 可以添加新的枚举值
// V1 版本
message Order {
int32 id = 1;
string customer = 2;
repeated OrderItem items = 3;
double total = 4;
}
// V2 版本(向后兼容)
message Order {
int32 id = 1;
string customer = 2;
repeated OrderItem items = 3;
double total = 4;
string currency = 5; // 新增字段
OrderStatus status = 6; // 新增字段
int64 created_at = 7; // 新增字段
}
// 不兼容的变更(必须创建新消息类型):
// message OrderV3 { ... } — 全新消息类型
// 客户端和服务端可以独立升级错误处理
gRPC 状态码与富错误
// gRPC 状态码
// OK — 成功
// Cancelled — 被取消
// Unknown — 未知错误
// InvalidArgument — 无效参数
// DeadlineExceeded — 超时
// NotFound — 未找到
// AlreadyExists — 已存在
// PermissionDenied — 权限不足
// ResourceExhausted — 资源耗尽
// Unauthenticated — 未认证
// FailedPrecondition — 前置条件失败
// Aborted — 中止
// OutOfRange — 超出范围
// Unimplemented — 未实现
// Internal — 内部错误
// Unavailable — 服务不可用
// DataLoss — 数据丢失
// 抛出错误
throw new RpcException(new Status(StatusCode.NotFound, "订单不存在"));
// 富错误信息(Rich Error Details)
try
{
await client.GetOrderAsync(new GetOrderRequest { OrderId = 999 });
}
catch (RpcException ex)
{
Console.WriteLine($"状态码: {ex.StatusCode}");
Console.WriteLine($"消息: {ex.Status.Detail}");
// 读取错误详情
var errorDetail = ex.Trailers.GetValue("error-detail-bin");
if (errorDetail != null)
{
// 解析自定义错误详情
}
}优点
缺点
总结
gRPC 四种通信模式:Unary(一元)、Server Streaming(服务端流)、Client Streaming(客户端流)、Bidirectional Streaming(双向流)。拦截器在服务端和客户端都可用,用于日志、认证、重试等横切关注点。错误通过 RpcException + StatusCode 传递,支持富错误详情。截止时间(Deadline)和 CancellationToken 控制调用超时。客户端通过 GrpcChannel 创建,配置连接池和 HTTP/2 参数。拦截器通过 .Intercept() 链式添加。
关键知识点
- 先分清这个主题位于请求链路、后台任务链路还是基础设施链路。
- 服务端主题通常不只关心功能正确,还关心稳定性、性能和可观测性。
- 任何框架能力都要结合配置、生命周期、异常传播和外部依赖一起看。
- 框架能力的真正重点是它在请求链路中的位置和对上下游的影响。
项目落地视角
- 画清请求进入、业务执行、外部调用、日志记录和错误返回的完整路径。
- 为关键链路补齐超时、重试、熔断、追踪和结构化日志。
- 把配置与敏感信息分离,并明确不同环境的差异来源。
- 画清执行顺序、入参来源、失败返回和日志记录点。
常见误区
- 只会堆中间件或组件,不知道它们在链路中的执行顺序。
- 忽略生命周期和线程池、连接池等运行时资源约束。
- 没有监控和测试就对性能或可靠性下结论。
- 知道 API 名称,却不知道它应该放在请求链路的哪个位置。
进阶路线
- 继续向运行时行为、可观测性、发布治理和微服务协同深入。
- 把主题和数据库、缓存、消息队列、认证授权联动起来理解。
- 沉淀团队级模板,包括统一异常处理、配置约定和基础设施封装。
- 继续补齐协议选型、网关治理、端点可观测性和契约演进策略。
适用场景
- 当你准备把《gRPC 流式通信与拦截器》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合 API 服务、后台任务、实时通信、认证授权和微服务协作场景。
- 当需求开始涉及稳定性、性能、可观测性和发布流程时,这类主题会成为基础设施能力。
落地建议
- 先定义请求链路与失败路径,再决定中间件、过滤器、服务边界和依赖方式。
- 为关键链路补日志、指标、追踪、超时与重试策略。
- 环境配置与敏感信息分离,避免把生产参数写死在代码或镜像里。
排错清单
- 先确认问题发生在路由、模型绑定、中间件、业务层还是基础设施层。
- 检查 DI 生命周期、配置来源、序列化规则和认证上下文。
- 查看线程池、连接池、缓存命中率和外部依赖超时。
复盘问题
- 如果把《gRPC 流式通信与拦截器》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《gRPC 流式通信与拦截器》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《gRPC 流式通信与拦截器》最大的收益和代价分别是什么?
