gRPC 与 Web 协议深入
大约 10 分钟约 2864 字
gRPC 与 Web 协议深入
简介
gRPC 是 Google 推出的高性能 RPC 框架,基于 Protocol Buffers 和 HTTP/2。理解 gRPC 的流式通信、拦截器、错误处理和 Web 集成(gRPC-Web),有助于构建高效的跨平台服务间通信。
特点
Protobuf 服务定义
服务与消息
// Protos/order.proto
syntax = "proto3";
package order;
option csharp_namespace = "OrderService.Grpc";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
// 订单服务定义
service OrderService {
// 一元调用(请求-响应)
rpc GetOrder (GetOrderRequest) returns (OrderResponse);
rpc CreateOrder (CreateOrderRequest) returns (OrderResponse);
rpc UpdateOrder (UpdateOrderRequest) returns (OrderResponse);
rpc DeleteOrder (DeleteOrderRequest) returns (google.protobuf.Empty);
// 服务端流(返回多个响应)
rpc StreamOrderHistory (GetOrderRequest) returns (stream OrderEvent);
// 客户端流(接收多个请求)
rpc BatchCreateOrders (stream CreateOrderRequest) returns (BatchOrderResponse);
// 双向流
rpc ChatStream (stream ChatMessage) returns (stream ChatMessage);
}
// 消息定义
message GetOrderRequest {
string order_id = 1;
}
message CreateOrderRequest {
string user_id = 1;
repeated OrderItemRequest items = 2;
string shipping_address = 3;
string remarks = 4;
}
message OrderItemRequest {
string product_id = 1;
string product_name = 2;
int32 quantity = 3;
double price = 4;
}
message UpdateOrderRequest {
string order_id = 1;
string status = 2;
string tracking_number = 3;
}
message DeleteOrderRequest {
string order_id = 1;
string reason = 2;
}
message OrderResponse {
string order_id = 1;
string user_id = 2;
string status = 3;
repeated OrderItemResponse items = 4;
double total_amount = 5;
google.protobuf.Timestamp created_at = 6;
google.protobuf.Timestamp updated_at = 7;
}
message OrderItemResponse {
string product_id = 1;
string product_name = 2;
int32 quantity = 3;
double price = 4;
double subtotal = 5;
}
message OrderEvent {
string event_type = 1;
string description = 2;
google.protobuf.Timestamp timestamp = 3;
string data = 4;
}
message BatchOrderResponse {
int32 total = 1;
int32 succeeded = 2;
int32 failed = 3;
repeated string order_ids = 4;
repeated string errors = 5;
}
message ChatMessage {
string sender = 1;
string content = 2;
google.protobuf.Timestamp timestamp = 3;
}gRPC 服务实现
服务端实现
// gRPC 服务实现
public class OrderGrpcService : OrderService.OrderServiceBase
{
private readonly IOrderRepository _orderRepo;
private readonly ILogger<OrderGrpcService> _logger;
public OrderGrpcService(IOrderRepository orderRepo, ILogger<OrderGrpcService> logger)
{
_orderRepo = orderRepo;
_logger = logger;
}
// 一元调用
public override async Task<OrderResponse> GetOrder(GetOrderRequest request, ServerCallContext context)
{
_logger.LogInformation("获取订单: {OrderId}", request.OrderId);
var order = await _orderRepo.GetByIdAsync(Guid.Parse(request.OrderId), context.CancellationToken);
if (order == null)
{
throw new RpcException(new Status(StatusCode.NotFound, $"订单 {request.OrderId} 不存在"));
}
return MapToResponse(order);
}
public override async Task<OrderResponse> CreateOrder(CreateOrderRequest request, ServerCallContext context)
{
var order = new Order
{
Id = Guid.NewGuid(),
UserId = Guid.Parse(request.UserId),
Items = request.Items.Select(i => new OrderItem
{
ProductId = Guid.Parse(i.ProductId),
ProductName = i.ProductName,
Quantity = i.Quantity,
Price = (decimal)i.Price
}).ToList(),
Status = "Created"
};
await _orderRepo.SaveAsync(order, context.CancellationToken);
return MapToResponse(order);
}
// 服务端流
public override async Task StreamOrderHistory(GetOrderRequest request, IServerStreamWriter<OrderEvent> responseStream, ServerCallContext context)
{
var events = await _orderRepo.GetOrderEventsAsync(Guid.Parse(request.OrderId), context.CancellationToken);
foreach (var evt in events)
{
if (context.CancellationToken.IsCancellationRequested) break;
await responseStream.WriteAsync(new OrderEvent
{
EventType = evt.Type,
Description = evt.Description,
Timestamp = Timestamp.FromDateTime(evt.Timestamp),
Data = evt.Data ?? ""
});
// 模拟实时流
await Task.Delay(100, context.CancellationToken);
}
}
// 客户端流
public override async Task<BatchOrderResponse> BatchCreateOrders(IAsyncStreamReader<CreateOrderRequest> requestStream, ServerCallContext context)
{
var orderIds = new List<string>();
var errors = new List<string>();
var total = 0;
await foreach (var request in requestStream.ReadAllAsync(context.CancellationToken))
{
total++;
try
{
var order = new Order
{
Id = Guid.NewGuid(),
UserId = Guid.Parse(request.UserId),
Status = "Created"
};
await _orderRepo.SaveAsync(order, context.CancellationToken);
orderIds.Add(order.Id.ToString());
}
catch (Exception ex)
{
errors.Add($"Order {total}: {ex.Message}");
}
}
return new BatchOrderResponse
{
Total = total,
Succeeded = orderIds.Count,
Failed = errors.Count
};
}
// 双向流
public override async Task ChatStream(IAsyncStreamReader<ChatMessage> requestStream, IServerStreamWriter<ChatMessage> responseStream, ServerCallContext context)
{
await foreach (var message in requestStream.ReadAllAsync(context.CancellationToken))
{
_logger.LogInformation("收到消息: {Sender}: {Content}", message.Sender, message.Content);
// 回复消息
await responseStream.WriteAsync(new ChatMessage
{
Sender = "Server",
Content = $"收到: {message.Content}",
Timestamp = Timestamp.FromDateTime(DateTime.UtcNow)
});
}
}
private OrderResponse MapToResponse(Order order)
{
var response = new OrderResponse
{
OrderId = order.Id.ToString(),
UserId = order.UserId.ToString(),
Status = order.Status,
TotalAmount = (double)order.TotalAmount,
CreatedAt = Timestamp.FromDateTime(order.CreatedAt),
UpdatedAt = Timestamp.FromDateTime(order.UpdatedAt)
};
response.Items.AddRange(order.Items.Select(i => new OrderItemResponse
{
ProductId = i.ProductId.ToString(),
ProductName = i.ProductName,
Quantity = i.Quantity,
Price = (double)i.Price,
Subtotal = (double)(i.Price * i.Quantity)
}));
return response;
}
}
// 注册
builder.Services.AddGrpc(options =>
{
options.EnableDetailedErrors = true; // 开发环境详细错误
options.MaxReceiveMessageSize = 4 * 1024 * 1024; // 4MB
options.MaxSendMessageSize = 4 * 1024 * 1024;
options.Interceptors.Add<ServerLoggingInterceptor>();
options.Interceptors.Add<ServerExceptionInterceptor>();
});
app.MapGrpcService<OrderGrpcService>();
app.MapGrpcReflectionService(); // gRPC 反射(用于 grpcurl 等工具)拦截器
服务端拦截器
// 日志拦截器
public class ServerLoggingInterceptor : Interceptor
{
private readonly ILogger<ServerLoggingInterceptor> _logger;
public ServerLoggingInterceptor(ILogger<ServerLoggingInterceptor> logger)
{
_logger = logger;
}
public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
TRequest request, ServerCallContext context,
UnaryServerMethod<TRequest, TResponse> continuation)
{
var stopwatch = Stopwatch.StartNew();
var method = context.Method;
_logger.LogInformation("gRPC 请求: {Method}", method);
try
{
var response = await continuation(request, context);
stopwatch.Stop();
_logger.LogInformation("gRPC 完成: {Method} ({Elapsed}ms)", method, stopwatch.ElapsedMilliseconds);
return response;
}
catch (Exception ex)
{
stopwatch.Stop();
_logger.LogError(ex, "gRPC 错误: {Method} ({Elapsed}ms)", method, stopwatch.ElapsedMilliseconds);
throw;
}
}
}
// 异常处理拦截器
public class ServerExceptionInterceptor : Interceptor
{
private readonly ILogger<ServerExceptionInterceptor> _logger;
public ServerExceptionInterceptor(ILogger<ServerExceptionInterceptor> logger) => _logger = logger;
public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
TRequest request, ServerCallContext context,
UnaryServerMethod<TRequest, TResponse> continuation)
{
try
{
return await continuation(request, context);
}
catch (BusinessException ex)
{
_logger.LogWarning(ex, "业务异常: {Message}", ex.Message);
var trailer = new Metadata { { "error-code", ex.Code } };
throw new RpcException(new Status(StatusCode.FailedPrecondition, ex.Message), trailer);
}
catch (ValidationException ex)
{
throw new RpcException(new Status(StatusCode.InvalidArgument, ex.Message));
}
catch (NotFoundException ex)
{
throw new RpcException(new Status(StatusCode.NotFound, ex.Message));
}
catch (Exception ex)
{
_logger.LogError(ex, "未处理异常");
throw new RpcException(new Status(StatusCode.Internal, "内部服务器错误"));
}
}
}客户端拦截器
// 客户端拦截器
public class ClientLoggingInterceptor : Interceptor
{
private readonly ILogger _logger;
public ClientLoggingInterceptor(ILogger<ClientLoggingInterceptor> logger) => _logger = logger;
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
TRequest request, ClientInterceptorContext<TRequest, TResponse> context,
AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
{
var method = context.Method.FullName;
_logger.LogDebug("gRPC 调用: {Method}", method);
var call = continuation(request, context);
return new AsyncUnaryCall<TResponse>(
responseAsync: call.ResponseAsync.ContinueWith(task =>
{
if (task.IsFaulted)
{
var rpcEx = task.Exception?.InnerException as RpcException;
_logger.LogError("gRPC 调用失败: {Method}, Status: {Status}",
method, rpcEx?.StatusCode);
}
return task.Result;
}),
responseHeadersAsync: call.ResponseHeadersAsync,
getStatus: call.GetStatus,
getTrailers: call.GetTrailers,
dispose: call.Dispose);
}
}
// 注册客户端
builder.Services.AddGrpcClient<OrderService.OrderServiceClient>(options =>
{
options.Address = new Uri("https://order-service:8080");
})
.AddInterceptor<ClientLoggingInterceptor>()
.AddResilienceHandler("grpc-pipeline", builder =>
{
builder.AddRetry(new RetryStrategyOptions<HttpResponseMessage>
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromMilliseconds(200),
BackoffType = DelayBackoffType.Exponential
});
});gRPC-Web
浏览器端支持
// dotnet add package Grpc.AspNetCore.Web
// 启用 gRPC-Web
app.UseGrpcWeb(new GrpcWebOptions { DefaultEnabled = true });
app.MapGrpcService<OrderGrpcService>().EnableGrpcWeb();
// JavaScript 客户端
// npm install @grpc/grpc-js google-protobuf grpc-web优点
缺点
总结
gRPC 使用 Protobuf 定义服务契约,通过 dotnet-grpc 工具自动生成 C# 代码。支持四种通信模式:一元调用、服务端流、客户端流和双向流。拦截器机制类似 ASP.NET Core 中间件,支持日志、认证、异常处理等横切关注点。错误处理使用 RpcException + StatusCode + Trailer 元数据。gRPC-Web 通过 UseGrpcWeb() 支持浏览器端调用。建议服务间通信优先使用 gRPC,对外 API 使用 REST。
关键知识点
- 先分清这个主题位于请求链路、后台任务链路还是基础设施链路。
- 服务端主题通常不只关心功能正确,还关心稳定性、性能和可观测性。
- 任何框架能力都要结合配置、生命周期、异常传播和外部依赖一起看。
- 框架能力的真正重点是它在请求链路中的位置和对上下游的影响。
项目落地视角
- 画清请求进入、业务执行、外部调用、日志记录和错误返回的完整路径。
- 为关键链路补齐超时、重试、熔断、追踪和结构化日志。
- 把配置与敏感信息分离,并明确不同环境的差异来源。
- 画清执行顺序、入参来源、失败返回和日志记录点。
常见误区
- 只会堆中间件或组件,不知道它们在链路中的执行顺序。
- 忽略生命周期和线程池、连接池等运行时资源约束。
- 没有监控和测试就对性能或可靠性下结论。
- 知道 API 名称,却不知道它应该放在请求链路的哪个位置。
进阶路线
- 继续向运行时行为、可观测性、发布治理和微服务协同深入。
- 把主题和数据库、缓存、消息队列、认证授权联动起来理解。
- 沉淀团队级模板,包括统一异常处理、配置约定和基础设施封装。
- 继续补齐协议选型、网关治理、端点可观测性和契约演进策略。
适用场景
- 当你准备把《gRPC 与 Web 协议深入》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合 API 服务、后台任务、实时通信、认证授权和微服务协作场景。
- 当需求开始涉及稳定性、性能、可观测性和发布流程时,这类主题会成为基础设施能力。
落地建议
- 先定义请求链路与失败路径,再决定中间件、过滤器、服务边界和依赖方式。
- 为关键链路补日志、指标、追踪、超时与重试策略。
- 环境配置与敏感信息分离,避免把生产参数写死在代码或镜像里。
排错清单
- 先确认问题发生在路由、模型绑定、中间件、业务层还是基础设施层。
- 检查 DI 生命周期、配置来源、序列化规则和认证上下文。
- 查看线程池、连接池、缓存命中率和外部依赖超时。
复盘问题
- 如果把《gRPC 与 Web 协议深入》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《gRPC 与 Web 协议深入》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《gRPC 与 Web 协议深入》最大的收益和代价分别是什么?
gRPC 错误处理进阶
Rich Error Details
// 使用 Google.Rpc.Status 传递丰富的错误信息
using Google.Rpc;
public class OrderGrpcService : OrderService.OrderServiceBase
{
public override async Task<OrderResponse> CreateOrder(CreateOrderRequest request, ServerCallContext context)
{
var errors = new List<BadRequest.Types.FieldViolation>();
if (string.IsNullOrEmpty(request.UserId))
errors.Add(new BadRequest.Types.FieldViolation { Field = "user_id", Description = "用户ID不能为空" });
if (request.Items.Count == 0)
errors.Add(new BadRequest.Types.FieldViolation { Field = "items", Description = "订单项不能为空" });
foreach (var item in request.Items)
{
if (item.Quantity <= 0)
errors.Add(new BadRequest.Types.FieldViolation
{
Field = $"items[{request.Items.IndexOf(item)}].quantity",
Description = "数量必须大于0"
});
}
if (errors.Count > 0)
{
var badRequest = new BadRequest();
badRequest.FieldViolations.AddRange(errors);
var status = new Google.Rpc.Status
{
Code = (int)StatusCode.InvalidArgument,
Message = "请求参数验证失败",
};
status.Details.Add(Google.Protobuf.WellKnownTypes.Any.Pack(badRequest));
var trailer = new Metadata();
trailer.Add("grpc-status-details-bin", status.ToByteArray());
throw new RpcException(new Status(StatusCode.InvalidArgument, "请求参数验证失败"), trailer);
}
// 正常处理逻辑...
return await CreateOrderInternal(request, context.CancellationToken);
}
}
// 客户端解析 Rich Error
try
{
var order = await client.CreateOrderAsync(new CreateOrderRequest());
}
catch (RpcException ex)
{
var statusEntry = ex.Trailers.FirstOrDefault(t => t.Key == "grpc-status-details-bin");
if (statusEntry != null)
{
var status = Google.Rpc.Status.Parser.ParseFrom(statusEntry.ValueBytes);
foreach (var detail in status.Details)
{
if (detail.Is(Google.Rpc.BadRequest.Descriptor))
{
var badRequest = detail.Unpack<Google.Rpc.BadRequest>();
foreach (var violation in badRequest.FieldViolations)
{
Console.WriteLine($"字段 {violation.Field}: {violation.Description}");
}
}
}
}
}gRPC 超时与重试策略
// 客户端重试配置
builder.Services.AddGrpcClient<OrderService.OrderServiceClient>(options =>
{
options.Address = new Uri("https://order-service:8080");
// 方法级超时配置
options.Intercept(new TimeoutInterceptor(TimeSpan.FromSeconds(30)));
})
.ConfigureChannel(channelOptions =>
{
// Channel 级配置
channelOptions.MaxRetryAttempts = 3;
channelOptions.HttpHandler = new SocketsHttpHandler
{
PooledConnectionIdleTimeout = TimeSpan.FromMinutes(5),
KeepAlivePingDelay = TimeSpan.FromSeconds(60),
KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
EnableMultipleHttp2Connections = true,
};
})
.AddResilienceHandler("grpc-retry", pipeline =>
{
// 重试策略
pipeline.AddRetry(new RetryStrategyOptions<HttpResponseMessage>
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromMilliseconds(200),
BackoffType = DelayBackoffType.Exponential,
UseJitter = true,
ShouldHandle = new PredicateBuilder<HttpResponseMessage>()
.Handle<HttpRequestException>()
.HandleResult(r => r.StatusCode == System.Net.HttpStatusCode.ServiceUnavailable)
});
// 熔断策略
pipeline.AddCircuitBreaker(new CircuitBreakerStrategyOptions<HttpResponseMessage>
{
SamplingDuration = TimeSpan.FromSeconds(30),
FailureRatio = 0.5,
MinimumThroughput = 10,
BreakDuration = TimeSpan.FromSeconds(30)
});
// 超时策略
pipeline.AddTimeout(new TimeoutStrategyOptions
{
Timeout = TimeSpan.FromSeconds(30)
});
});
// 超时拦截器
public class TimeoutInterceptor : Interceptor
{
private readonly TimeSpan _timeout;
public TimeoutInterceptor(TimeSpan timeout) => _timeout = timeout;
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
TRequest request, ClientInterceptorContext<TRequest, TResponse> context,
AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
{
var cts = new CancellationTokenSource(_timeout);
var options = context.Options.WithCancellationToken(cts.Token);
var newContext = new ClientInterceptorContext<TRequest, TResponse>(
context.Method, context.Host, options);
return continuation(request, newContext);
}
}gRPC 健康检查集成
// dotnet add package Grpc.HealthCheck
// 注册健康检查服务
builder.Services.AddGrpcHealthChecks()
.AddCheck("database", () => HealthCheckResult.Healthy())
.AddCheck("redis", () => HealthCheckResult.Healthy());
app.MapGrpcService<GrpcHealthCheckService>();
// 客户端健康检查
builder.Services.AddGrpcClient<OrderService.OrderServiceClient>(options =>
{
options.Address = new Uri("https://order-service:8080");
})
.AddInterceptor<HealthCheckInterceptor>();
public class HealthCheckInterceptor : Interceptor
{
private readonly Health.HealthClient _healthClient;
private readonly ILogger _logger;
public HealthCheckInterceptor(Health.HealthClient healthClient, ILogger<HealthCheckInterceptor> logger)
{
_healthClient = healthClient;
_logger = logger;
}
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
TRequest request, ClientInterceptorContext<TRequest, TResponse> context,
AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
{
// 定期检查服务健康状态
try
{
var health = _healthClient.Check(new HealthCheckRequest { Service = "OrderService" });
if (health.Status != HealthCheckResponse.Types.ServingStatus.Serving)
{
_logger.LogWarning("gRPC 服务不健康: {Status}", health.Status);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "健康检查失败");
}
return continuation(request, context);
}
}gRPC 与 REST 共存
// 同一 ASP.NET Core 应用同时支持 gRPC 和 REST
var app = builder.Build();
// gRPC 端点
app.MapGrpcService<OrderGrpcService>();
app.MapGrpcReflectionService();
// gRPC-Web 支持
app.UseGrpcWeb(new GrpcWebOptions { DefaultEnabled = true });
// REST 端点(使用 JSON)
app.MapGet("/api/orders/{id}", async (string id, IOrderRepository repo) =>
{
var order = await repo.GetByIdAsync(Guid.Parse(id));
return order != null ? Results.Ok(order) : Results.NotFound();
});
app.MapPost("/api/orders", async (CreateOrderRequest request, IOrderRepository repo) =>
{
// REST 端点可以复用 gRPC 的 protobuf 消息类型
var order = new Order
{
Id = Guid.NewGuid(),
UserId = request.UserId,
Status = "Created"
};
await repo.SaveAsync(order);
return Results.Created($"/api/orders/{order.Id}", order);
});
// 健康检查端点
app.MapHealthChecks("/health");