gRPC 高性能通信
大约 9 分钟约 2640 字
gRPC 高性能通信
简介
gRPC 是一种基于 HTTP/2 和 Protocol Buffers 的高性能 RPC 框架,非常适合微服务之间的内部通信、双向流式数据传输和强类型契约场景。对 ASP.NET Core 项目来说,gRPC 的价值不只是“比 JSON 快”,更在于它把接口契约、序列化、流式通信和多语言协作放到了一套统一模型里。
特点
实现
定义 Protobuf 契约
syntax = "proto3";
option csharp_namespace = "OrderService.Grpc";
service OrderService {
rpc GetOrder (GetOrderRequest) returns (OrderResponse);
rpc CreateOrder (CreateOrderRequest) returns (OrderResponse);
rpc StreamOrderUpdates (StreamOrderRequest) returns (stream OrderUpdate);
rpc UploadOrderItems (stream OrderItemRequest) returns (UploadResponse);
rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}
message GetOrderRequest {
int32 order_id = 1;
}
message CreateOrderRequest {
string user_id = 1;
repeated OrderItem items = 2;
string remark = 3;
}
message OrderItem {
int32 product_id = 1;
string product_name = 2;
int32 quantity = 3;
double price = 4;
}
message OrderResponse {
int32 id = 1;
string order_no = 2;
string user_id = 3;
double total_amount = 4;
string status = 5;
}
message StreamOrderRequest {
int32 order_id = 1;
}
message OrderUpdate {
string status = 1;
string message = 2;
int64 timestamp = 3;
}
message OrderItemRequest {
int32 product_id = 1;
int32 quantity = 2;
}
message UploadResponse {
int32 total_items = 1;
bool success = 2;
}
message ChatMessage {
string sender = 1;
string content = 2;
}<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Grpc.AspNetCore" Version="2.62.0" />
<Protobuf Include="Protos\order.proto" GrpcServices="Server" />
</ItemGroup>
</Project>契约设计建议:
- 先设计 message,再设计 service
- 字段编号不要随意变更
- 删除字段要谨慎,兼容性优先ASP.NET Core 服务端接入
using Grpc.Core;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddGrpc(options =>
{
options.EnableDetailedErrors = true;
options.MaxReceiveMessageSize = 4 * 1024 * 1024;
options.MaxSendMessageSize = 4 * 1024 * 1024;
});
var app = builder.Build();
app.MapGrpcService<OrderServiceImpl>();
app.MapGet("/", () => "Use a gRPC client to communicate with this endpoint.");
app.Run();public class OrderServiceImpl : OrderService.OrderServiceBase
{
private readonly ILogger<OrderServiceImpl> _logger;
private readonly IOrderRepository _repository;
public OrderServiceImpl(ILogger<OrderServiceImpl> logger, IOrderRepository repository)
{
_logger = logger;
_repository = repository;
}
public override async Task<OrderResponse> GetOrder(GetOrderRequest request, ServerCallContext context)
{
var order = await _repository.GetByIdAsync(request.OrderId, context.CancellationToken);
if (order is null)
throw new RpcException(new Status(StatusCode.NotFound, $"订单 {request.OrderId} 不存在"));
return new OrderResponse
{
Id = order.Id,
OrderNo = order.OrderNo,
UserId = order.UserId,
TotalAmount = (double)order.TotalAmount,
Status = order.Status
};
}
public override async Task<OrderResponse> CreateOrder(CreateOrderRequest request, ServerCallContext context)
{
var order = await _repository.CreateAsync(request.UserId, request.Items, request.Remark, context.CancellationToken);
_logger.LogInformation("Order created via gRPC. OrderId={OrderId}", order.Id);
return new OrderResponse
{
Id = order.Id,
OrderNo = order.OrderNo,
UserId = order.UserId,
TotalAmount = (double)order.TotalAmount,
Status = order.Status
};
}
}流式通信
// 服务端流:持续推送订单状态
public override async Task StreamOrderUpdates(
StreamOrderRequest request,
IServerStreamWriter<OrderUpdate> responseStream,
ServerCallContext context)
{
while (!context.CancellationToken.IsCancellationRequested)
{
var status = await _repository.GetStatusAsync(request.OrderId, context.CancellationToken);
await responseStream.WriteAsync(new OrderUpdate
{
Status = status.Status,
Message = status.Message,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds()
});
await Task.Delay(2000, context.CancellationToken);
}
}// 客户端流:批量上传订单项
public override async Task<UploadResponse> UploadOrderItems(
IAsyncStreamReader<OrderItemRequest> requestStream,
ServerCallContext context)
{
var count = 0;
await foreach (var item in requestStream.ReadAllAsync(context.CancellationToken))
{
_logger.LogInformation("Item received: ProductId={ProductId}, Qty={Quantity}", item.ProductId, item.Quantity);
count++;
}
return new UploadResponse { TotalItems = count, Success = true };
}// 双向流:聊天室 / 实时协作类场景
public override async Task Chat(
IAsyncStreamReader<ChatMessage> requestStream,
IServerStreamWriter<ChatMessage> responseStream,
ServerCallContext context)
{
await foreach (var message in requestStream.ReadAllAsync(context.CancellationToken))
{
await responseStream.WriteAsync(new ChatMessage
{
Sender = "server",
Content = $"已收到 {message.Sender}: {message.Content}"
});
}
}.NET 客户端调用
using Grpc.Net.Client;
var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new OrderService.OrderServiceClient(channel);
var response = await client.GetOrderAsync(new GetOrderRequest { OrderId = 1001 });
Console.WriteLine($"订单状态: {response.Status}");// 服务端流客户端
using var call = client.StreamOrderUpdates(new StreamOrderRequest { OrderId = 1001 });
await foreach (var update in call.ResponseStream.ReadAllAsync())
{
Console.WriteLine($"状态更新: {update.Status} / {update.Message}");
}// gRPC client 注入方式(推荐)
builder.Services.AddGrpcClient<OrderService.OrderServiceClient>(options =>
{
options.Address = new Uri("https://order-service.internal");
});gRPC 拦截器
// 服务端拦截器 — 日志、认证、指标
public class ServerLoggingInterceptor : Interceptor
{
private readonly ILogger<ServerLoggingInterceptor> _logger;
public ServerLoggingInterceptor(ILogger<ServerLoggingInterceptor> logger)
{
_logger = logger;
}
public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
TRequest request,
ServerCallContext context,
UnaryServerMethod<TRequest, TResponse> continuation)
{
var method = context.Method;
_logger.LogInformation("gRPC call: {Method} started", method);
var sw = Stopwatch.StartNew();
try
{
var response = await continuation(request, context);
sw.Stop();
_logger.LogInformation(
"gRPC call: {Method} completed in {Elapsed}ms, Status={StatusCode}",
method, sw.ElapsedMilliseconds, context.Status.StatusCode);
return response;
}
catch (Exception ex)
{
sw.Stop();
_logger.LogError(ex,
"gRPC call: {Method} failed in {Elapsed}ms",
method, sw.ElapsedMilliseconds);
throw;
}
}
}
// 客户端拦截器 — 添加认证头、重试
public class ClientAuthInterceptor : Interceptor
{
private readonly ITokenProvider _tokenProvider;
public ClientAuthInterceptor(ITokenProvider tokenProvider)
{
_tokenProvider = tokenProvider;
}
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
TRequest request,
ClientInterceptorContext<TRequest, TResponse> context,
AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
{
var token = _tokenProvider.GetToken();
var headers = context.Options.Headers ?? new Metadata();
headers.Add("Authorization", $"Bearer {token}");
var options = context.Options.WithHeaders(headers);
return base.AsyncUnaryCall(request,
new ClientInterceptorContext<TRequest, TResponse>(
context.Method, context.Host, options),
continuation);
}
}
// 注册拦截器
builder.Services.AddGrpc(options =>
{
options.Interceptors.Add<ServerLoggingInterceptor>();
options.EnableDetailedErrors = builder.Environment.IsDevelopment();
});gRPC 错误处理与元数据
// gRPC 错误处理 — 使用 Status 对象
public override async Task<OrderResponse> GetOrder(
GetOrderRequest request, ServerCallContext context)
{
// 参数验证错误
if (request.OrderId <= 0)
{
throw new RpcException(new Status(StatusCode.InvalidArgument, "订单ID必须大于0"));
}
var order = await _repository.GetByIdAsync(request.OrderId);
if (order == null)
{
throw new RpcException(new Status(StatusCode.NotFound, $"订单 {request.OrderId} 不存在"));
}
// 权限不足
var userId = GetUserIdFromContext(context);
if (order.UserId != userId)
{
throw new RpcException(new Status(StatusCode.PermissionDenied, "无权访问此订单"));
}
return MapToResponse(order);
}
// 自定义错误详情(gRPC Rich Error Model)
var status = new Status(StatusCode.InvalidArgument, "参数验证失败");
var trailers = new Metadata
{
{ "error-code", "VALIDATION_ERROR" },
{ "error-details", "Quantity must be positive" },
{ "retry-after", "0" }
};
throw new RpcException(status, trailers);
// 客户端处理错误
try
{
var response = await client.GetOrderAsync(new GetOrderRequest { OrderId = 1001 });
}
catch (RpcException ex)
{
switch (ex.StatusCode)
{
case StatusCode.NotFound:
Console.WriteLine("订单不存在");
break;
case StatusCode.DeadlineExceeded:
Console.WriteLine("请求超时");
break;
case StatusCode.PermissionDenied:
Console.WriteLine("权限不足");
break;
case StatusCode.Unavailable:
Console.WriteLine("服务不可用");
break;
default:
Console.WriteLine($"gRPC 错误: {ex.Status}");
break;
}
}
// Deadline 设置 — 避免长时间等待
var response = await client.GetOrderAsync(
new GetOrderRequest { OrderId = 1001 },
deadline: DateTime.UtcNow.AddSeconds(5));
// gRPC 元数据传递
// 服务端读取客户端元数据
var userAgent = context.RequestHeaders.GetValue("user-agent");
var traceId = context.RequestHeaders.GetValue("x-trace-id");
// 服务端写入响应元数据
context.ResponseHeaders.Add("x-request-id", Guid.NewGuid().ToString());
context.WriteResponseTrailersAsync(new Metadata { { "custom-trailer", "value" } });gRPC 健康检查
// gRPC 健康检查协议
// dotnet add package Grpc.HealthCheck
// dotnet add package Grpc.AspNetCore.HealthChecks
builder.Services.AddGrpcHealthChecks()
.AddCheck("order-service", () => HealthCheckResult.Healthy())
.AddCheck("database", () =>
{
// 检查数据库连接
return _db.Database.CanConnect()
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy("数据库连接失败");
});
var app = builder.Build();
app.MapGrpcHealthChecksService();
// 客户端健康检查
var healthClient = channel.CreateHealthClient();
var response = await healthClient.CheckHealthAsync(new HealthCheckRequest { Service = "order-service" });
Console.WriteLine($"Service status: {response.Status}");Protobuf 契约演进
// 契约演进规则:
// 1. 字段编号一旦使用,永远不能重复使用
// 2. 可以添加新字段(旧客户端会忽略新字段)
// 3. 可以删除字段(不能重用编号)
// 4. 不能更改字段类型
// 5. optional 字段兼容性最好
// 兼容性演进示例
syntax = "proto3";
message OrderResponse {
int32 id = 1; // 永久保留
string order_no = 2; // 永久保留
string user_id = 3; // 永久保留
double total_amount = 4; // 永久保留
string status = 5; // 永久保留
// v2 新增字段
string currency = 6; // 新增:币种
repeated string tags = 7; // 新增:标签
string discount_code = 8; // 新增:折扣码
// 已废弃字段(不要删除,标记为 reserved)
// string old_field = 9; // 已废弃
reserved 9; // 标记编号为已保留
reserved "old_field"; // 标记字段名为已保留
}
// 枚举演进
enum OrderStatus {
ORDER_STATUS_UNSPECIFIED = 0; // 必须有默认值
ORDER_STATUS_PENDING = 1;
ORDER_STATUS_PAID = 2;
ORDER_STATUS_SHIPPED = 3;
ORDER_STATUS_COMPLETED = 4;
ORDER_STATUS_CANCELLED = 5;
// v2 新增状态
ORDER_STATUS_REFUNDED = 6;
ORDER_STATUS_PARTIAL_REFUND = 7;
reserved 8; // 保留编号
}
// Oneof — 互斥字段
message PaymentRequest {
oneof payment_method {
AlipayPayment alipay = 1;
WechatPayment wechat = 2;
BankCardPayment bank_card = 3;
}
}
message AlipayPayment {
string trade_no = 1;
string buyer_id = 2;
}
message WechatPayment {
string transaction_id = 1;
string open_id = 2;
}
message BankCardPayment {
string card_no = 1;
string bank_code = 2;
}gRPC 反射与调试
// 启用 gRPC 反射(便于调试工具连接)
builder.Services.AddGrpcReflection();
var app = builder.Build();
if (app.Environment.IsDevelopment())
{
app.MapGrpcReflectionService();
}
// 使用 grpcurl 命令行工具调试
// grpcurl -plaintext localhost:5001 list
// grpcurl -plaintext localhost:5001 describe OrderService
// grpcurl -plaintext -d '{"order_id": 1001}' localhost:5001 OrderService/GetOrder
// 使用 gRPC UI(Web 调试界面)
// dotnet add package Grpc.AspNetCore.Server.Reflection
// 访问 http://localhost:5001/grpcui
// gRPC 与 REST 共存
// ASP.NET Core 支持在同一服务中同时提供 gRPC 和 REST API
var app = builder.Build();
app.MapGrpcService<OrderServiceImpl>(); // gRPC 端点
app.MapControllers(); // REST 端点
app.MapGet("/health", () => "OK"); // 健康检查
// gRPC-Web 支持(浏览器调用 gRPC)
// dotnet add package Grpc.AspNetCore.Web
builder.Services.AddGrpc();
builder.Services.AddGrpcWeb();
var app = builder.Build();
app.UseGrpcWeb(); // 必须在 MapGrpcService 之前
// 前端 JavaScript 调用 gRPC-Web
// import { OrderServiceClient } from './generated/order_grpc_web_pb';
// const client = new OrderServiceClient('https://api.example.com');
// client.getOrder({ orderId: 1001 }, (err, response) => { ... });优点
缺点
总结
gRPC 最适合解决的是“服务与服务之间如何高效、强类型、低延迟地通信”,而不是替代所有 HTTP API。对 ASP.NET Core 项目来说,真正的落地点通常在微服务内部调用、流式推送和多语言内部服务协作,而不是前端页面直接访问。
关键知识点
.proto是契约中心,字段编号兼容性必须认真维护。- gRPC 最大优势不只是速度,还有强类型和流式模型。
- 内部服务更适合 gRPC,外部开放 API 往往仍以 REST 为主。
- 流式通信很强,但也需要考虑超时、重连和资源释放。
项目落地视角
- 订单、支付、库存、用户中心之间的内部服务调用很适合 gRPC。
- 实时状态推送、设备状态流和日志流也适合 gRPC streaming。
- BFF / 前端直连层不一定适合纯 gRPC,更常见是 REST 或 gRPC-Web。
- 在服务治理中,要把超时、认证、日志和重试一起配上。
常见误区
- 觉得 gRPC 一定比 REST 更适合所有场景。
- 只会写一元调用,不理解流式场景边界。
- 随意修改 proto 字段编号,破坏兼容性。
- 用 gRPC 做开放 API,却忽略浏览器和网关适配问题。
进阶路线
- 学习 gRPC 拦截器、认证、Deadline、Retry 与流控机制。
- 研究 gRPC-Web、Envoy、API Gateway 的接入方式。
- 将 gRPC 与 OpenTelemetry、熔断、服务发现一起治理。
- 深入理解 Protobuf 兼容性演进策略。
适用场景
- 微服务内部调用。
- 实时状态流与双向流交互。
- 多语言后端系统的统一协议层。
- 高吞吐、低延迟、强契约的服务间通信。
落地建议
- 先明确是否真的是“内部服务通信”场景,再用 gRPC。
- 所有 proto 变更都要做兼容性审查。
- 对高频调用设置超时、重试和日志追踪。
- 服务端和客户端都要补健康检查和链路监控。
排错清单
- 连不上:先查 HTTP/2、TLS、端口和网关支持。
- 反序列化失败:先查 proto 字段定义和生成代码版本。
- 流式卡住:先查 cancellation、超时和客户端读取循环。
- 性能没提升:先查是不是瓶颈根本不在序列化层。
