AI 工程化与 MLOps
大约 10 分钟约 3124 字
AI 工程化与 MLOps
简介
将 AI 模型从实验阶段推向生产环境需要完整的工程化体系。MLOps(Machine Learning Operations)涵盖模型训练管理、实验追踪、模型注册、服务部署和监控。理解 MLflow、模型服务和 A/B 测试的实践,有助于构建可靠的 AI 生产系统。
特点
实验追踪
MLflow 集成
# pip install mlflow scikit-learn
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split
# 1. 设置 MLflow 追踪服务器
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("customer-churn-prediction")
# 2. 实验追踪
def train_and_log_model(X, y, params):
"""训练模型并记录实验"""
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42)
with mlflow.start_run(run_name=f"rf_{params['n_estimators']}"):
# 记录参数
mlflow.log_params(params)
# 训练模型
model = RandomForestClassifier(
n_estimators=params['n_estimators'],
max_depth=params['max_depth'],
random_state=42
)
model.fit(X_train, y_train)
# 评估
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred, average='weighted')
# 记录指标
mlflow.log_metrics({
"accuracy": accuracy,
"f1_score": f1
})
# 记录模型
mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name="customer-churn-rf"
)
# 记录特征重要性图
import matplotlib.pyplot as plt
importances = model.feature_importances_
plt.figure(figsize=(10, 6))
plt.barh(range(len(importances)), importances)
plt.xlabel('Feature Importance')
plt.savefig('feature_importance.png')
mlflow.log_artifact('feature_importance.png')
print(f"Accuracy: {accuracy:.4f}, F1: {f1:.4f}")
print(f"Run ID: {mlflow.active_run().info.run_id}")
return model, accuracy
# 3. 超参数搜索
from itertools import product
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [5, 10, 15, None]
}
best_model = None
best_accuracy = 0
for n_est, max_d in product(param_grid['n_estimators'], param_grid['max_depth']):
params = {'n_estimators': n_est, 'max_depth': max_d}
model, accuracy = train_and_log_model(X, y, params)
if accuracy > best_accuracy:
best_accuracy = accuracy
best_model = model
print(f"最佳模型准确率: {best_accuracy:.4f}")
# 4. 模型版本管理
client = mlflow.tracking.MlflowClient()
# 获取模型版本
model_versions = client.search_model_versions("name='customer-churn-rf'")
for version in model_versions:
print(f"Version: {version.version}, Status: {version.status}")
# 推进模型阶段
client.transition_model_version_stage(
name="customer-churn-rf",
version=1,
stage="Production"
)模型服务
FastAPI 推理服务
# model_server.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import numpy as np
import mlflow
import time
from prometheus_client import Counter, Histogram, generate_latest
from starlette.responses import Response
app = FastAPI(title="ML Model Server")
# Prometheus 指标
REQUEST_COUNT = Counter('model_requests_total', 'Total requests', ['model', 'status'])
REQUEST_LATENCY = Histogram('model_request_latency_seconds', 'Request latency', ['model'])
# 模型加载
class ModelService:
def __init__(self):
self.model = None
self.model_version = None
self.load_time = None
def load_model(self, model_uri):
self.model = mlflow.sklearn.load_model(model_uri)
self.load_time = time.time()
print(f"模型已加载: {model_uri}")
def predict(self, features):
if self.model is None:
raise RuntimeError("模型未加载")
return self.model.predict_proba(features)
model_service = ModelService()
# 请求模型
class PredictionRequest(BaseModel):
features: list[list[float]]
model_version: str | None = None
class PredictionResponse(BaseModel):
predictions: list[list[float]]
model_version: str
latency_ms: float
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
start = time.time()
try:
features = np.array(request.features)
predictions = model_service.predict(features).tolist()
latency = (time.time() - start) * 1000
REQUEST_COUNT.labels(model='churn', status='success').inc()
REQUEST_LATENCY.labels(model='churn').observe(time.time() - start)
return PredictionResponse(
predictions=predictions,
model_version=model_service.model_version or "latest",
latency_ms=round(latency, 2)
)
except Exception as e:
REQUEST_COUNT.labels(model='churn', status='error').inc()
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
return {"status": "healthy", "model_loaded": model_service.model is not None}
@app.get("/metrics")
async def metrics():
return Response(content=generate_latest(), media_type="text/plain")
# 启动时加载模型
@app.on_event("startup")
async def startup():
model_uri = "models:/customer-churn-rf/Production"
model_service.load_model(model_uri)ONNX 推理优化
# ONNX 模型转换和推理
import onnxruntime as ort
def convert_to_onnx(sklearn_model, input_shape, output_path):
"""将 scikit-learn 模型转换为 ONNX"""
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
initial_type = [('float_input', FloatTensorType([None, input_shape]))]
onnx_model = convert_sklearn(sklearn_model, initial_types=initial_type)
with open(output_path, "wb") as f:
f.write(onnx_model.SerializeToString())
print(f"ONNX 模型已保存: {output_path}")
class ONNXModelService:
"""ONNX 推理服务(更高性能)"""
def __init__(self, model_path):
# 优化会话选项
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
sess_options.intra_op_num_threads = 4
# 使用 GPU(如果可用)
providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
self.session = ort.InferenceSession(model_path, sess_options, providers=providers)
self.input_name = self.session.get_inputs()[0].name
self.output_names = [o.name for o in self.session.get_outputs()]
def predict(self, features):
"""ONNX 推理"""
input_data = np.array(features, dtype=np.float32)
outputs = self.session.run(self.output_names, {self.input_name: input_data})
return outputs[0]
# 性能对比:
# sklearn predict: ~50ms
# ONNX CPU: ~5ms (10x 提升)
# ONNX GPU: ~1ms (50x 提升)模型监控
数据版本管理
# 使用 DVC(Data Version Control)管理数据版本
# 类似 Git 管理代码,DVC 管理数据和模型文件
# 安装 DVC
# pip install dvc dvc-s3
# 初始化 DVC(在 Git 仓库中)
# dvc init
# 跟踪数据文件
# dvc add data/training_data.csv
# git add data/training_data.csv.dvc data/.gitignore
# git commit -m "添加训练数据 v1"
# 配置远程存储(使用 MinIO/S3)
# dvc remote add -d myremote s3://mybucket/dvc-storage
# dvc remote modify myremote endpointurl http://192.168.1.100:9000
# dvc remote modify myremote access_key_id admin
# dvc remote modify myremote secret_access_key MinIO@2024Secret
# 推送数据到远程存储
# dvc push
# 拉取数据
# dvc pull
# 切换数据版本
# git checkout v1.0
# dvc checkout # 恢复 v1.0 对应的数据特征存储(Feature Store)
# 特征存储统一管理特征的创建、存储和服务
# 避免训练-服务偏差(Training-Serving Skew)
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import pandas as pd
import numpy as np
@dataclass
class FeatureDefinition:
name: str
dtype: str
description: str
source: str # 数据来源(raw_table, computed, external)
default_value: Optional[object] = None
class FeatureStore:
"""简易特征存储"""
def __init__(self):
self._features: dict[str, pd.DataFrame] = {}
self._definitions: dict[str, FeatureDefinition] = {}
def register_feature(self, definition: FeatureDefinition):
"""注册特征定义"""
self._definitions[definition.name] = definition
def compute_features(self, entity_ids: list[int], feature_names: list[str]) -> pd.DataFrame:
"""计算指定实体的特征"""
result = pd.DataFrame({"entity_id": entity_ids})
for name in feature_names:
if name not in self._definitions:
raise ValueError(f"未注册的特征: {name}")
# 从缓存或数据源计算特征
if name in self._features:
feature_data = self._features[name]
result = result.merge(feature_data, on="entity_id", how="left")
else:
result[name] = self._definitions[name].default_value
return result
def store_features(self, feature_name: str, data: pd.DataFrame):
"""存储计算好的特征"""
self._features[feature_name] = data
def get_feature_vector(self, entity_id: int, feature_names: list[str]) -> dict:
"""获取单个实体的特征向量(用于在线推理)"""
df = self.compute_features([entity_id], feature_names)
if len(df) == 0:
return {}
return df.iloc[0].to_dict()
# 使用示例
store = FeatureStore()
# 注册特征
store.register_feature(FeatureDefinition(
name="user_age", dtype="int", description="用户年龄",
source="raw_table", default_value=0))
store.register_feature(FeatureDefinition(
name="purchase_count_30d", dtype="int", description="30天购买次数",
source="computed", default_value=0))
store.register_feature(FeatureDefinition(
name="avg_order_value", dtype="float", description="平均订单金额",
source="computed", default_value=0.0))
# 存储计算好的特征
user_features = pd.DataFrame({
"entity_id": [1, 2, 3],
"user_age": [25, 35, 45],
"purchase_count_30d": [5, 12, 3],
"avg_order_value": [150.0, 280.0, 95.0]
})
store.store_features("user_features", user_features)
# 在线推理时获取特征向量
feature_vector = store.get_feature_vector(1, ["user_age", "purchase_count_30d", "avg_order_value"])
# {'entity_id': 1, 'user_age': 25, 'purchase_count_30d': 5, 'avg_order_value': 150.0}CI/CD 流水线配置
# .github/workflows/ml-pipeline.yml
# GitHub Actions 模型训练与部署流水线
name: ML Pipeline
on:
push:
paths:
- 'models/**'
- 'data/**'
schedule:
- cron: '0 2 * * 0' # 每周日凌晨 2 点重新训练
jobs:
train:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install mlflow scikit-learn pytest
- name: Run tests
run: pytest tests/ -v --cov=src
- name: Train model
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
run: |
python scripts/train.py --experiment customer-churn
- name: Validate model
run: |
python scripts/validate.py --min-accuracy 0.85
- name: Register model
if: success()
run: |
python scripts/register_model.py --stage Staging
deploy:
needs: train
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Deploy to staging
run: |
# 部署到 Staging 环境
echo "Deploying model to staging..."
- name: Run A/B test
run: |
# 运行 A/B 测试 24 小时
python scripts/ab_test.py --duration 86400
- name: Promote to production
run: |
# 将模型推进到生产环境
python scripts/promote_model.py --stage Production模型性能基线与退化检测
# 模型性能基线管理
class ModelPerformanceBaseline:
"""模型性能基线 — 检测模型退化"""
def __init__(self, model_name: str):
self.model_name = model_name
self.baselines: dict[str, float] = {}
self.alert_threshold = 0.05 # 低于基线 5% 触发告警
def set_baseline(self, metric_name: str, value: float):
"""设置性能基线"""
self.baselines[metric_name] = value
def check_performance(self, current_metrics: dict[str, float]) -> dict:
"""检查当前性能是否退化"""
results = {}
for metric, current_value in current_metrics.items():
if metric in self.baselines:
baseline = self.baselines[metric]
degradation = (baseline - current_value) / baseline
results[metric] = {
"baseline": baseline,
"current": current_value,
"degradation_pct": round(degradation * 100, 2),
"degraded": degradation > self.alert_threshold,
"status": "CRITICAL" if degradation > self.alert_threshold else "OK"
}
return results
def generate_report(self, current_metrics: dict[str, float]) -> str:
"""生成性能报告"""
check = self.check_performance(current_metrics)
lines = [f"模型性能报告: {self.model_name}", "=" * 40]
for metric, info in check.items():
status_icon = "!!!" if info["degraded"] else "OK "
lines.append(
f"{status_icon} {metric}: 基线={info['baseline']:.4f}, "
f"当前={info['current']:.4f}, "
f"退化={info['degradation_pct']}%"
)
return "\n".join(lines)
# 使用
baseline = ModelPerformanceBaseline("customer-churn-rf")
baseline.set_baseline("accuracy", 0.92)
baseline.set_baseline("f1_score", 0.89)
baseline.set_baseline("precision", 0.91)
current = {"accuracy": 0.87, "f1_score": 0.85, "precision": 0.88}
report = baseline.generate_report(current)
print(report)
# !!! accuracy: 基线=0.9200, 当前=0.8700, 退化=5.43%
# OK f1_score: 基线=0.8900, 当前=0.8500, 退化=4.49%
# !!! precision: 基线=0.9100, 当前=0.8800, 退化=3.30%模型监控
数据漂移检测
# 模型监控 — 检测数据漂移和性能退化
class ModelMonitor:
"""模型性能监控"""
def __init__(self, reference_data, threshold=0.05):
self.reference_data = reference_data
self.threshold = threshold
self.predictions_log = []
def detect_data_drift(self, current_data, feature_names):
"""检测特征分布漂移(PSI)"""
from scipy import stats
drift_results = {}
for i, name in enumerate(feature_names):
ref_col = self.reference_data[:, i]
cur_col = current_data[:, i]
# Population Stability Index (PSI)
psi = self._calculate_psi(ref_col, cur_col)
drift_results[name] = {
'psi': psi,
'drift_detected': psi > self.threshold
}
return drift_results
def _calculate_psi(self, expected, actual, bins=10):
"""计算 PSI(Population Stability Index)"""
expected_hist, bin_edges = np.histogram(expected, bins=bins, density=True)
actual_hist, _ = np.histogram(actual, bins=bin_edges, density=True)
# 避免除零
epsilon = 1e-6
expected_hist = expected_hist + epsilon
actual_hist = actual_hist + epsilon
psi = np.sum((actual_hist - expected_hist) * np.log(actual_hist / expected_hist))
return psi
def log_prediction(self, features, prediction, actual=None):
"""记录预测结果"""
self.predictions_log.append({
'timestamp': time.time(),
'features': features,
'prediction': prediction,
'actual': actual
})
def calculate_metrics(self):
"""计算性能指标"""
if not self.predictions_log:
return {}
completed = [p for p in self.predictions_log if p['actual'] is not None]
if not completed:
return {"pending": len(self.predictions_log)}
y_true = [p['actual'] for p in completed]
y_pred = [p['prediction'] for p in completed]
accuracy = sum(t == p for t, p in zip(y_true, y_pred)) / len(y_true)
return {
"total_predictions": len(self.predictions_log),
"completed": len(completed),
"accuracy": accuracy,
"accuracy_trend": self._calculate_trend(completed)
}
def _calculate_trend(self, data, window=100):
"""计算准确率趋势"""
if len(data) < window:
return []
trend = []
for i in range(window, len(data), window):
batch = data[i-window:i]
acc = sum(p['actual'] == p['prediction'] for p in batch) / len(batch)
trend.append(acc)
return trend
# A/B 测试
class ABTestRouter:
"""模型 A/B 测试路由"""
def __init__(self, model_a, model_b, traffic_split=0.5):
self.model_a = model_a
self.model_b = model_b
self.traffic_split = traffic_split
self.results = {'a': [], 'b': []}
def route(self, features, user_id):
"""路由请求到不同模型"""
# 基于 user_id 哈希确定路由(保证同一用户始终路由到同一模型)
bucket = hash(user_id) % 100 / 100.0
if bucket < self.traffic_split:
prediction = self.model_a.predict([features])[0]
self.results['a'].append({'prediction': prediction})
return prediction, 'a'
else:
prediction = self.model_b.predict([features])[0]
self.results['b'].append({'prediction': prediction})
return prediction, 'b'
def get_statistics(self):
"""获取 A/B 测试统计"""
return {
'model_a': {
'requests': len(self.results['a']),
},
'model_b': {
'requests': len(self.results['b']),
}
}优点
缺点
总结
MLOps 通过 MLflow 实现实验追踪和模型版本管理,记录参数、指标和产出物。模型服务使用 FastAPI 封装推理接口,支持健康检查和 Prometheus 指标。ONNX 格式提供 10x-50x 的推理加速。模型监控通过 PSI 检测数据漂移,A/B 测试比较不同模型版本的线上表现。完整的 MLOps 流水线包括:数据准备 → 模型训练 → 实验追踪 → 模型注册 → 部署上线 → 持续监控。
关键知识点
- 先分清模型能力边界、数据边界和工程边界。
- 任何 AI 主题都不只看效果,还要看延迟、成本、可解释性和安全性。
- 评估方式和失败样例往往比“换哪个模型”更重要。
项目落地视角
- 给数据来源、Prompt 模板、Embedding 版本、评估集和实验结果做版本管理。
- 上线前准备兜底策略,例如拒答、回退、人工审核或缓存降级。
- 观察错误类型时,区分数据问题、召回问题、提示词问题和模型问题。
常见误区
- 只关注 Demo 效果,不考虑线上稳定性和可复现性。
- 没有评估集就频繁调参,最后无法解释为什么变好或变差。
- 忽略权限、审计、隐私和模型输出的安全边界。
进阶路线
- 继续补齐训练、推理、评估、MLOps 和治理链路。
- 把主题放回真实业务流程,思考谁提供数据、谁消费结果、谁负责兜底。
- 把 PoC 逐步升级到可观测、可回滚、可演进的生产方案。
适用场景
- 当你准备把《AI 工程化与 MLOps》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合企业知识问答、内容生成、分类抽取和智能助手等场景。
- 当需求同时关注效果、时延、成本和安全边界时,这类主题最有价值。
落地建议
- 先定义评估集、成功标准和失败样例,再开始调模型或调提示。
- 把数据来源、分块方式、Embedding 版本和 Prompt 模板纳入版本管理。
- 上线前准备兜底策略,例如拒答、回退、人工审核或检索降级。
排错清单
- 先判断问题出在数据、检索、Prompt、模型还是后处理。
- 检查上下文是否过长、分块是否过碎或召回是否偏题。
- 对错误回答做分类,区分幻觉、事实过时、指令误解和格式错误。
复盘问题
- 如果把《AI 工程化与 MLOps》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《AI 工程化与 MLOps》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《AI 工程化与 MLOps》最大的收益和代价分别是什么?
