AI 模型部署与推理
大约 11 分钟约 3392 字
AI 模型部署与推理
简介
模型部署的重点,不是“模型能不能跑起来”,而是“模型能否稳定、低延迟、可观测地对外提供推理能力”。从训练产物导出,到推理服务封装、容器化部署、CPU / GPU 资源管理、批处理优化、滚动发布和监控告警,整条链路都决定了模型是否真正能进入生产环境。
特点
实现
模型导出:PyTorch → ONNX
import torch
import torch.onnx
class SimpleClassifier(torch.nn.Module):
def __init__(self):
super().__init__()
self.net = torch.nn.Sequential(
torch.nn.Linear(784, 256),
torch.nn.ReLU(),
torch.nn.Linear(256, 10)
)
def forward(self, x):
return self.net(x)
model = SimpleClassifier().eval()
dummy_input = torch.randn(1, 784)
torch.onnx.export(
model,
dummy_input,
"model.onnx",
input_names=["input"],
output_names=["output"],
dynamic_axes={
"input": {0: "batch_size"},
"output": {0: "batch_size"}
},
opset_version=17
)import onnx
onnx_model = onnx.load("model.onnx")
onnx.checker.check_model(onnx_model)
print("ONNX model is valid")导出阶段要确认:
- 输入输出 shape 是否正确
- 是否使用了目标推理引擎不支持的算子
- 导出前后结果是否一致ONNX Runtime 推理
import numpy as np
import onnxruntime as ort
session = ort.InferenceSession(
"model.onnx",
providers=["CPUExecutionProvider"]
)
input_name = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name
input_data = np.random.randn(1, 784).astype(np.float32)
result = session.run([output_name], {input_name: input_data})
print(result[0].shape)# 批量推理
batch_data = np.random.randn(32, 784).astype(np.float32)
results = session.run([output_name], {input_name: batch_data})
print(results[0].shape)FastAPI 推理服务
from fastapi import FastAPI
from pydantic import BaseModel
import numpy as np
import onnxruntime as ort
app = FastAPI(title="AI Inference Service")
session = ort.InferenceSession("model.onnx")
class PredictRequest(BaseModel):
features: list[float]
class PredictResponse(BaseModel):
prediction: int
confidence: float
probabilities: list[float]
@app.post("/predict", response_model=PredictResponse)
async def predict(request: PredictRequest):
input_name = session.get_inputs()[0].name
input_data = np.array([request.features], dtype=np.float32)
logits = session.run(None, {input_name: input_data})[0][0]
probs = np.exp(logits) / np.exp(logits).sum()
pred = int(np.argmax(probs))
return PredictResponse(
prediction=pred,
confidence=float(probs[pred]),
probabilities=probs.tolist()
)
@app.get("/health")
async def health():
return {"status": "ok"}uvicorn main:app --host 0.0.0.0 --port 8000服务化时至少要关注:
- 输入格式校验
- 推理错误处理
- 健康检查
- 模型加载失败时的启动行为Docker 容器化部署
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY main.py .
COPY model.onnx ./models/model.onnx
ENV MODEL_PATH=/app/models/model.onnx
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]version: '3.8'
services:
inference-api:
build: .
ports:
- "8000:8000"
environment:
- MODEL_PATH=/app/models/model.onnx
volumes:
- ./models:/app/models
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3docker build -t ai-inference:1.0.0 .
docker run --rm -p 8000:8000 ai-inference:1.0.0性能优化:量化、批处理与并发
from onnxruntime.quantization import quantize_dynamic, QuantType
quantize_dynamic(
model_input="model.onnx",
model_output="model-int8.onnx",
weight_type=QuantType.QInt8
)import os
print("original MB:", os.path.getsize("model.onnx") / 1024 / 1024)
print("quantized MB:", os.path.getsize("model-int8.onnx") / 1024 / 1024)# 简化版批处理推理
def batch_predict(session, features_list, batch_size=32):
input_name = session.get_inputs()[0].name
outputs = []
for i in range(0, len(features_list), batch_size):
batch = np.array(features_list[i:i+batch_size], dtype=np.float32)
result = session.run(None, {input_name: batch})[0]
outputs.extend(result.tolist())
return outputs优化目标通常至少有四类:
- 降低延迟
- 提升吞吐
- 降低模型体积
- 降低 GPU / CPU / 内存占用量化类型对比与选型
def explain_quantization_types():
"""量化类型对比
1. 动态量化(Dynamic Quantization):
- 权重在导出时量化为 int8,激活在推理时动态量化
- 优点:无需校准数据,使用简单
- 缺点:推理时有额外的量化/反量化开销
- 适合:RNN/LSTM 等模型
2. 静态量化(Static Quantization):
- 权重和激活都预先量化
- 需要校准数据集确定激活的量化范围
- 优点:推理速度最快
- 缺点:需要准备校准数据,精度损失可能更大
- 适合:CNN、BERT 等模型
3. 量化感知训练(QAT):
- 在训练过程中模拟量化效果
- 精度损失最小
- 成本最高,需要重新训练
- 适合:对精度要求极高的场景
精度影响:
- FP32 → FP16:精度几乎无损,体积减半,GPU 推理首选
- FP32 → INT8:可能有 1-3% 精度损失,体积减少 75%
- FP32 → INT4:精度损失较大,需要仔细验证
"""
print("量化选型建议:")
print(" GPU 推理首选: FP16(几乎无损)")
print(" CPU 推理首选: INT8 动态量化(平衡速度和精度)")
print(" 边缘设备: INT8 静态量化 + TensorRT")
print(" 超低延迟: INT4 + 量化感知训练")
explain_quantization_types()# FP16 半精度推理
session_fp16 = ort.InferenceSession(
"model.onnx",
providers=["CPUExecutionProvider"]
)
# ONNX Runtime 会自动使用 FP16 加速(如果硬件支持)
# 在 GPU 上效果更明显
session_gpu = ort.InferenceSession(
"model.onnx",
providers=["CUDAExecutionProvider"]
)
# 性能对比脚本
import time
def benchmark_session(session, input_data, warmup=10, runs=100):
input_name = session.get_inputs()[0].name
# 预热
for _ in range(warmup):
session.run(None, {input_name: input_data})
# 正式测试
start = time.perf_counter()
for _ in range(runs):
session.run(None, {input_name: input_data})
elapsed = time.perf_counter() - start
avg_latency = (elapsed / runs) * 1000 # ms
throughput = runs / elapsed # QPS
print(f"平均延迟: {avg_latency:.2f}ms, 吞吐: {throughput:.1f} QPS")
return avg_latency, throughput异步推理与请求队列
import asyncio
from concurrent.futures import ThreadPoolExecutor
import queue
import threading
class AsyncInferenceEngine:
"""异步推理引擎 — 将 CPU 推理放入线程池避免阻塞事件循环"""
def __init__(self, model_path: str, max_workers: int = 4):
self.session = ort.InferenceSession(
model_path,
providers=["CPUExecutionProvider"],
sess_options=self._build_options()
)
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.input_name = self.session.get_inputs()[0].name
def _build_options(self):
opts = ort.SessionOptions()
opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
opts.intra_op_num_threads = 2
opts.inter_op_num_threads = 2
opts.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
return opts
async def predict(self, features: list[float]) -> dict:
loop = asyncio.get_event_loop()
input_data = np.array([features], dtype=np.float32)
result = await loop.run_in_executor(
self.executor,
lambda: self.session.run(None, {self.input_name: input_data})
)
logits = result[0][0]
return self._postprocess(logits)
def _postprocess(self, logits):
probs = np.exp(logits) / np.exp(logits).sum()
return {
"prediction": int(np.argmax(probs)),
"confidence": float(probs.max()),
"probabilities": probs.tolist()
}
async def batch_predict(self, features_list: list[list[float]],
batch_size: int = 32) -> list[dict]:
results = []
for i in range(0, len(features_list), batch_size):
batch = features_list[i:i + batch_size]
tasks = [self.predict(f) for f in batch]
batch_results = await asyncio.gather(*tasks)
results.extend(batch_results)
return results
# 在 FastAPI 中使用
engine = AsyncInferenceEngine("model.onnx")
@app.post("/predict")
async def predict(request: PredictRequest):
result = await engine.predict(request.features)
return resultONNX Runtime 高级配置
def explain_ort_configurations():
"""ONNX Runtime 高级配置
1. SessionOptions 关键参数:
- graph_optimization_level: 图优化级别
ORT_DISABLE_ALL → ORT_ENABLE_BASIC → ORT_ENABLE_EXTENDED → ORT_ENABLE_ALL
- intra_op_num_threads: 算子内并行线程数
- inter_op_num_threads: 算子间并行线程数
- execution_mode: SEQUENTIAL 或 PARALLEL
- enable_mem_pattern: 内存模式优化
- enable_mem_reuse: 内存复用
2. ExecutionProvider 选择:
- CPUExecutionProvider: 通用 CPU 推理
- CUDAExecutionProvider: NVIDIA GPU
- TensorrtExecutionProvider: TensorRT 加速
- DmlExecutionProvider: DirectML(Windows GPU)
- CoreMLExecutionProvider: Apple 芯片
3. 性能调优建议:
- GPU 场景优先用 CUDAExecutionProvider
- CPU 场景合理设置线程数(不超过物理核心数)
- 启用所有图优化级别
- 使用 IOBinding 减少 CPU-GPU 数据拷贝
"""
print("配置建议:")
print(" CPU 推理: threads = 物理核心数, ORT_ENABLE_ALL")
print(" GPU 推理: CUDA EP + IOBinding + FP16")
print(" 边缘设备: TensorRT EP + INT8")
explain_ort_configurations()生产治理:版本、回滚、监控
deployment_metadata = {
"model_name": "risk-score-model",
"model_version": "2026.04.12",
"framework": "onnxruntime",
"batch_size": 32,
"device": "cpu",
"owner": "ai-platform-team"
}
print(deployment_metadata)生产环境建议至少监控:
- 请求 QPS
- P95 / P99 延迟
- 推理错误率
- 模型加载失败次数
- GPU 利用率 / 显存占用
- 输入分布漂移常见版本策略:
- current:当前生产版本
- next:待验证版本
- rollback:上一稳定版本
上线前:
- 对比新旧模型的离线评估
- 做小流量验证
- 出问题时快速回退到上一稳定版本优点
缺点
总结
AI 模型部署的重点不是把模型包装成一个 HTTP 接口,而是把模型当作一个长期运行的生产组件来治理。真正进入生产环境时,你需要同时考虑导出格式、推理框架、部署方式、性能指标、健康检查、版本回滚和数据漂移,而不是只关注离线精度。
关键知识点
- 模型导出和模型部署是两件事,之间还隔着推理框架与运行环境。
- REST / gRPC 接口只是壳,模型治理才是核心。
- 性能优化不能只看模型计算,还要看预处理、后处理、序列化和网络成本。
- 上线后仍要持续监控效果与分布漂移,部署不是结束。
项目落地视角
- 小模型服务可以先从 FastAPI + ONNX Runtime 起步。
- 高并发推理服务要重点关注批处理、并发限制和资源隔离。
- GPU 场景要同时看吞吐、显存占用和冷启动时间。
- 企业系统里通常还要做版本审批、AB 测试和快速回滚能力。
常见误区
- 只验证本地推理通过,不验证容器内行为和线上依赖。
- 只看离线准确率,不看线上延迟和资源消耗。
- 量化后模型体积变小,就以为线上一定更快。
- 模型上线后没有监控和回滚方案,出问题只能临时救火。
进阶路线
- 学习 TensorRT、OpenVINO、Triton Inference Server、TorchServe 等更专业的推理方案。
- 结合 Kubernetes、GPU 调度、服务网格做大规模推理平台化部署。
- 建立模型注册、版本审批、灰度发布和回滚机制。
- 深入研究漂移检测、在线评估和推理链路可观测性。
适用场景
- 模型在线推理 API。
- 批量离线推理服务。
- GPU 推理服务和边缘设备部署。
- 企业内部统一 AI 推理平台。
落地建议
- 从最小可部署路径开始:导出 → 推理 → API → Docker → 健康检查。
- 模型上线前建立统一版本号、元数据和评估记录。
- 对关键推理链路建立延迟、错误率、资源占用监控。
- 对每个生产模型准备稳定版本和回滚版本,避免上线后无退路。
排错清单
- 模型导出失败:检查算子支持、输入 shape、导出参数。
- 推理报错:检查输入类型、预处理、provider 和模型路径。
- 延迟高:检查 batch 策略、预处理瓶颈、框架选择和硬件配置。
- 上线后效果差:检查线上输入分布是否偏离训练集。
Triton Inference Server 概述
def explain_triton_inference_server():
"""NVIDIA Triton Inference Server
Triton 是 NVIDIA 推出的高性能推理服务框架,
支持多种框架(ONNX、TensorRT、PyTorch、TF)和多种后端。
核心能力:
1. 多模型服务:同时服务多个模型,不同框架混用
2. 动态批处理:自动将小请求合并为大批次推理
3. 模型热更新:不中断服务即可更新模型
4. 多实例并发:同一模型启动多个实例提高吞吐
5. GPU 显存管理:多模型共享 GPU 显存
部署配置示例(config.pbtxt):
name: "risk-score-model"
platform: "onnxruntime_onnx"
max_batch_size: 32
input [
{ name: "input", data_type: TYPE_FP32, dims: [784] }
]
output [
{ name: "output", data_type: TYPE_FP32, dims: [10] }
]
dynamic_batching {
preferred_batch_size: [8, 16, 32]
max_queue_delay_microseconds: 5000
}
适合场景:
- 多模型同时在线推理
- GPU 推理服务化
- 需要动态批处理的高吞吐场景
- 模型频繁更新的生产环境
"""
print("Triton 适合:")
print(" GPU 推理服务化(尤其多模型)")
print(" 需要动态批处理提升吞吐")
print(" 多框架模型统一管理")
explain_triton_inference_server()滚动发布与灰度策略
def explain_deployment_strategies():
"""模型滚动发布与灰度策略
1. 蓝绿部署(Blue-Green):
- 两套完全相同的环境,同时运行新旧版本
- 切换时只需修改路由/负载均衡
- 回滚即时:切回旧版本即可
- 缺点:需要双倍资源
2. 金丝雀发布(Canary):
- 先让少量流量(如 5%)打到新版本
- 观察关键指标(延迟、错误率、业务指标)
- 逐步增加流量比例(5% → 20% → 50% → 100%)
- 适合对稳定性要求高的场景
3. A/B 测试:
- 按用户特征(地域、等级)分流
- 对比新旧版本的业务效果
- 不仅看技术指标,还看业务指标
模型发布检查清单:
- [ ] 离线评估:新模型在测试集上的精度不低于旧模型
- [ ] 延迟测试:P99 延迟不高于旧模型
- [ ] 资源测试:显存/内存占用在可接受范围
- [ ] 一致性测试:新模型对相同输入的结果是否合理
- [ ] 回滚方案:确认上一稳定版本可以随时切回
- [ ] 监控就绪:新版本的指标大盘已配置告警
"""
print("发布策略选择:")
print(" 首次上线: 蓝绿部署(快速回滚)")
print(" 日常迭代: 金丝雀发布(渐进放量)")
print(" 效果验证: A/B 测试(对比业务指标)")
explain_deployment_strategies()推理服务可观测性
# Prometheus 指标采集(FastAPI + prometheus_client)
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Response
# 定义指标
PREDICTION_COUNT = Counter(
"inference_predictions_total",
"Total predictions",
["model_version", "status"]
)
PREDICTION_LATENCY = Histogram(
"inference_prediction_duration_seconds",
"Prediction latency",
["model_version"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
MODEL_VERSION = Gauge(
"inference_model_version",
"Current model version"
)
# 在推理接口中使用
@app.post("/predict")
async def predict(request: PredictRequest):
start_time = time.perf_counter()
try:
result = await engine.predict(request.features)
latency = time.perf_counter() - start_time
PREDICTION_COUNT.labels(
model_version=deployment_metadata["model_version"],
status="success"
).inc()
PREDICTION_LATENCY.labels(
model_version=deployment_metadata["model_version"]
).observe(latency)
return result
except Exception as e:
PREDICTION_COUNT.labels(
model_version=deployment_metadata["model_version"],
status="error"
).inc()
raise
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain")推理服务关键监控指标:
- 请求 QPS 和成功/失败比例
- P50 / P95 / P99 推理延迟
- 模型加载时间(冷启动)
- GPU 利用率、显存占用、温度
- 输入特征分布漂移检测
- 请求队列长度和等待时间