LLM 应用架构设计
大约 20 分钟约 5879 字
LLM 应用架构设计
简介
构建基于大语言模型(LLM)的应用不只是调用 API 那么简单。生产级 LLM 应用需要解决提示词管理、上下文窗口优化、对话记忆、流式响应、Token 计费控制、模型路由、语义缓存、评估指标等一系列架构问题。本文将系统梳理 LLM 应用的核心架构模式,从单次调用到多轮对话,从单体到微服务,帮助你构建可靠、可扩展、成本可控的 LLM 应用。
LLM 应用的核心挑战在于不确定性。与传统软件确定性的输入输出不同,LLM 的响应是概率性的。这要求我们在架构层面引入更多的监控、兜底和评估机制。好的 LLM 架构不是消除不确定性,而是管理不确定性——通过重试、降级、缓存、评估等手段让系统在概率性输出的基础上保持稳定可靠。
特点
提示词模板管理
模板引擎实现
"""
提示词模板管理 — 支持版本化、参数化和 A/B 测试
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import hashlib
import json
from datetime import datetime
@dataclass
class PromptTemplate:
"""提示词模板"""
template_id: str
name: str
content: str # 使用 {variable} 占位符
version: str
variables: List[str] = field(default_factory=list)
description: str = ""
tags: List[str] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.now)
is_active: bool = True
def render(self, **kwargs) -> str:
"""渲染模板,替换变量"""
missing = set(self.variables) - set(kwargs.keys())
if missing:
raise ValueError(f"缺少模板变量:{missing}")
return self.content.format(**kwargs)
def fingerprint(self) -> str:
"""计算模板指纹(用于缓存和去重)"""
content_hash = hashlib.sha256(self.content.encode()).hexdigest()[:16]
return f"{self.template_id}:v{self.version}:{content_hash}"
@dataclass
class PromptTemplateVersion:
"""模板版本管理"""
template_id: str
versions: Dict[str, PromptTemplate] = field(default_factory=dict)
active_version: str = ""
def add_version(self, template: PromptTemplate):
self.versions[template.version] = template
if template.is_active:
self.active_version = template.version
def get_active(self) -> Optional[PromptTemplate]:
return self.versions.get(self.active_version)
def get_version(self, version: str) -> Optional[PromptTemplate]:
return self.versions.get(version)
def activate(self, version: str):
if version not in self.versions:
raise ValueError(f"版本 {version} 不存在")
# 停用所有版本
for v in self.versions.values():
v.is_active = False
self.versions[version].is_active = True
self.active_version = version
class PromptTemplateManager:
"""提示词模板管理器"""
def __init__(self):
self._templates: Dict[str, PromptTemplateVersion] = {}
def register(self, template: PromptTemplate):
if template.template_id not in self._templates:
self._templates[template.template_id] = PromptTemplateVersion(template.template_id)
self._templates[template.template_id].add_version(template)
def get(self, template_id: str, version: str = None) -> Optional[PromptTemplate]:
version_manager = self._templates.get(template_id)
if not version_manager:
return None
if version:
return version_manager.get_version(version)
return version_manager.get_active()
def render(self, template_id: str, **kwargs) -> str:
template = self.get(template_id)
if not template:
raise ValueError(f"模板 {template_id} 不存在")
return template.render(**kwargs)
# 使用示例
manager = PromptTemplateManager()
# 注册模板
manager.register(PromptTemplate(
template_id="product_description",
name="商品描述生成",
content="你是一个专业的电商文案撰写师。请根据以下信息生成商品描述:\n\n"
"商品名称:{product_name}\n"
"商品类别:{category}\n"
"核心卖点:{key_features}\n"
"目标用户:{target_audience}\n\n"
"要求:\n"
"1. 字数控制在 200-300 字\n"
"2. 突出核心卖点\n"
"3. 语气符合目标用户群体",
version="1.0",
variables=["product_name", "category", "key_features", "target_audience"],
tags=["电商", "文案"]
))
# 渲染模板
prompt = manager.render("product_description",
product_name="智能手表 Pro",
category="可穿戴设备",
key_features="心率监测、GPS 定位、7天续航",
target_audience="运动爱好者"
)
print(prompt)上下文窗口优化
文本分块策略
"""
上下文窗口优化 — 文本分块与 RAG 检索
"""
import tiktoken
from typing import List, Tuple
from dataclasses import dataclass
@dataclass
class TextChunk:
"""文本分块"""
content: str
index: int
start_char: int
end_char: int
token_count: int
metadata: dict = None
class TextChunker:
"""文本分块器 — 多种分块策略"""
def __init__(self, model: str = "gpt-4"):
try:
self.encoding = tiktoken.encoding_for_model(model)
except KeyError:
self.encoding = tiktoken.get_encoding("cl100k_base")
def count_tokens(self, text: str) -> int:
"""计算 Token 数量"""
return len(self.encoding.encode(text))
def chunk_by_token_size(
self,
text: str,
max_tokens: int = 512,
overlap_tokens: int = 50
) -> List[TextChunk]:
"""
按 Token 大小分块(推荐)
overlap_tokens: 重叠 Token 数,确保上下文连贯
"""
tokens = self.encoding.encode(text)
chunks = []
start = 0
index = 0
while start < len(tokens):
end = min(start + max_tokens, len(tokens))
chunk_tokens = tokens[start:end]
chunk_text = self.encoding.decode(chunk_tokens)
chunks.append(TextChunk(
content=chunk_text,
index=index,
start_char=0, # 简化计算
end_char=0,
token_count=len(chunk_tokens)
))
index += 1
if end >= len(tokens):
break
start = end - overlap_tokens # 重叠部分
return chunks
def chunk_by_paragraph(
self,
text: str,
max_tokens: int = 512
) -> List[TextChunk]:
"""按段落分块(保持语义完整性)"""
paragraphs = text.split("\n\n")
chunks = []
current_chunk = ""
current_tokens = 0
index = 0
for para in paragraphs:
para_tokens = self.count_tokens(para)
# 单个段落超过限制,需要进一步切分
if para_tokens > max_tokens:
if current_chunk:
chunks.append(TextChunk(
content=current_chunk.strip(),
index=index,
start_char=0, end_char=0,
token_count=current_tokens
))
index += 1
current_chunk = ""
current_tokens = 0
# 按句子再切分
sentences = para.replace("。", "。\n").replace("!", "!\n").split("\n")
for sent in sentences:
sent_tokens = self.count_tokens(sent)
if current_tokens + sent_tokens > max_tokens:
if current_chunk:
chunks.append(TextChunk(
content=current_chunk.strip(),
index=index,
start_char=0, end_char=0,
token_count=current_tokens
))
index += 1
current_chunk = sent
current_tokens = sent_tokens
else:
current_chunk += sent
current_tokens += sent_tokens
elif current_tokens + para_tokens > max_tokens:
chunks.append(TextChunk(
content=current_chunk.strip(),
index=index,
start_char=0, end_char=0,
token_count=current_tokens
))
index += 1
current_chunk = para + "\n\n"
current_tokens = para_tokens
else:
current_chunk += para + "\n\n"
current_tokens += para_tokens
if current_chunk.strip():
chunks.append(TextChunk(
content=current_chunk.strip(),
index=index,
start_char=0, end_char=0,
token_count=current_tokens
))
return chunks
# 使用示例
chunker = TextChunker(model="gpt-4")
long_text = "这是一段很长的文本..." * 100
chunks = chunker.chunk_by_token_size(long_text, max_tokens=256, overlap_tokens=30)
for chunk in chunks:
print(f"块 {chunk.index}: {chunk.token_count} tokens, {len(chunk.content)} chars")RAG 检索增强生成
"""
RAG(Retrieval-Augmented Generation)检索增强生成
"""
from typing import List, Optional
from dataclasses import dataclass
import json
@dataclass
class Document:
"""知识库文档"""
doc_id: str
content: str
metadata: dict = None
embedding: List[float] = None
class VectorStore:
"""向量存储(简化实现)"""
def __init__(self):
self._documents: List[Document] = []
def add(self, doc: Document):
self._documents.append(doc)
def search(self, query_embedding: List[float], top_k: int = 5) -> List[Tuple[Document, float]]:
"""余弦相似度搜索"""
results = []
for doc in self._documents:
if doc.embedding is None:
continue
similarity = self._cosine_similarity(query_embedding, doc.embedding)
results.append((doc, similarity))
results.sort(key=lambda x: x[1], reverse=True)
return results[:top_k]
@staticmethod
def _cosine_similarity(a: List[float], b: List[float]) -> float:
dot = sum(x * y for x, y in zip(a, b))
norm_a = sum(x ** 2 for x in a) ** 0.5
norm_b = sum(x ** 2 for x in b) ** 0.5
return dot / (norm_a * norm_b + 1e-8)
class RAGService:
"""RAG 检索增强生成服务"""
def __init__(self, vector_store: VectorStore, llm_client):
self.vector_store = vector_store
self.llm_client = llm_client
async def query(
self,
question: str,
top_k: int = 5,
max_context_tokens: int = 3000
) -> dict:
"""RAG 查询流程"""
# 1. 将问题转换为向量
query_embedding = await self.llm_client.embed(question)
# 2. 检索相关文档
results = self.vector_store.search(query_embedding, top_k=top_k)
# 3. 构建上下文(控制 Token 数量)
context_parts = []
total_tokens = 0
chunker = TextChunker()
for doc, score in results:
doc_tokens = chunker.count_tokens(doc.content)
if total_tokens + doc_tokens > max_context_tokens:
break
context_parts.append(doc.content)
total_tokens += doc_tokens
context = "\n\n---\n\n".join(context_parts)
# 4. 构建增强提示词
prompt = f"""基于以下参考资料回答问题。如果参考资料中没有相关信息,请明确说明。
参考资料:
{context}
问题:{question}
请提供准确、有依据的回答,并在回答中引用来源。"""
# 5. 调用 LLM 生成回答
response = await self.llm_client.chat(prompt)
return {
"question": question,
"answer": response,
"sources": [{"doc_id": doc.doc_id, "score": float(score)} for doc, score in results],
"context_tokens": total_tokens
}对话记忆管理
多层记忆架构
"""
对话记忆管理 — 短期记忆 + 长期记忆 + 摘要
"""
from typing import List, Dict, Optional
from dataclasses import dataclass, field
from datetime import datetime
import json
@dataclass
class Message:
"""对话消息"""
role: str # system, user, assistant
content: str
timestamp: datetime = field(default_factory=datetime.now)
token_count: int = 0
metadata: dict = None
class ConversationMemory:
"""对话记忆管理器"""
def __init__(self, max_tokens: int = 4000, summary_threshold: float = 0.8):
self.messages: List[Message] = []
self.max_tokens = max_tokens
self.summary_threshold = summary_threshold
self.summary: Optional[str] = None
self._total_tokens = 0
def add_message(self, role: str, content: str, token_count: int = 0):
"""添加消息"""
msg = Message(
role=role,
content=content,
token_count=token_count or self._estimate_tokens(content)
)
self.messages.append(msg)
self._total_tokens += msg.token_count
# 超过阈值时触发摘要
if self._total_tokens > self.max_tokens * self.summary_threshold:
self._compress_memory()
def get_context(self, system_prompt: str = None) -> List[Dict]:
"""获取对话上下文(用于 LLM API)"""
context = []
if system_prompt:
context.append({"role": "system", "content": system_prompt})
# 如果有摘要,插入摘要作为系统消息
if self.summary:
context.append({
"role": "system",
"content": f"以下是之前对话的摘要:\n{self.summary}"
})
# 添加最近的对话
for msg in self.messages:
context.append({
"role": msg.role,
"content": msg.content
})
return context
def _compress_memory(self):
"""压缩记忆:保留最近的消息,对旧消息生成摘要"""
if len(self.messages) < 4:
return
# 保留最近的消息(至少保留最近 2 轮对话)
recent_count = min(4, len(self.messages))
old_messages = self.messages[:-recent_count]
recent_messages = self.messages[-recent_count:]
# 生成旧消息的摘要(实际项目中调用 LLM)
old_summary = self._generate_summary(old_messages)
if self.summary:
self.summary = f"{self.summary}\n\n后续对话摘要:\n{old_summary}"
else:
self.summary = old_summary
self.messages = recent_messages
self._total_tokens = sum(msg.token_count for msg in self.messages)
def _generate_summary(self, messages: List[Message]) -> str:
"""生成对话摘要(实际项目中调用 LLM)"""
# 简化实现:拼接关键内容
parts = []
for msg in messages:
if msg.role == "user":
parts.append(f"用户问:{msg.content[:100]}")
elif msg.role == "assistant":
parts.append(f"助手答:{msg.content[:100]}")
return "\n".join(parts)
@staticmethod
def _estimate_tokens(text: str) -> int:
"""粗略估算 Token 数量(中文约 1.5 字/token)"""
return int(len(text) / 1.5)
def clear(self):
"""清空记忆"""
self.messages.clear()
self.summary = None
self._total_tokens = 0
class LongTermMemory:
"""长期记忆存储(基于向量检索)"""
def __init__(self, vector_store: VectorStore):
self.vector_store = vector_store
async def store(self, user_id: str, key: str, content: str):
"""存储长期记忆"""
doc = Document(
doc_id=f"{user_id}_{key}",
content=content,
metadata={"user_id": user_id, "key": key}
)
self.vector_store.add(doc)
async def recall(self, user_id: str, query: str, top_k: int = 3) -> List[str]:
"""检索相关长期记忆"""
results = self.vector_store.search(
query_embedding=[], # 实际项目中用 embedding
top_k=top_k
)
return [doc.content for doc, score in results
if doc.metadata and doc.metadata.get("user_id") == user_id]流式响应
SSE 流式输出
"""
流式响应 — Server-Sent Events (SSE)
降低首字节时间(TTFB),提升用户体验
"""
from typing import AsyncIterator, Callable, Optional
from dataclasses import dataclass
import asyncio
import json
import time
@dataclass
class StreamChunk:
"""流式响应块"""
content: str
token_count: int = 0
finish_reason: Optional[str] = None
timestamp: float = 0.0
class LLMStreamingService:
"""LLM 流式响应服务"""
def __init__(self, llm_client):
self.llm_client = llm_client
async def stream_chat(
self,
messages: list,
model: str = "gpt-4",
temperature: float = 0.7,
max_tokens: int = 2000,
on_chunk: Callable[[StreamChunk], None] = None
) -> AsyncIterator[str]:
"""
流式聊天 — 逐 Token 输出
"""
start_time = time.time()
total_tokens = 0
full_response = ""
# 模拟流式输出(实际项目中调用 OpenAI stream API)
async for chunk in self._call_llm_stream(messages, model, temperature, max_tokens):
total_tokens += chunk.get("token_count", 1)
content = chunk.get("content", "")
full_response += content
stream_chunk = StreamChunk(
content=content,
token_count=chunk.get("token_count", 1),
finish_reason=chunk.get("finish_reason"),
timestamp=time.time()
)
if on_chunk:
on_chunk(stream_chunk)
# SSE 格式输出
yield f"data: {json.dumps({'content': content, 'tokens': total_tokens}, ensure_ascii=False)}\n\n"
# 发送结束标记
elapsed = time.time() - start_time
yield f"data: {json.dumps({'done': True, 'total_tokens': total_tokens, 'elapsed_seconds': round(elapsed, 2)}, ensure_ascii=False)}\n\n"
async def _call_llm_stream(self, messages, model, temperature, max_tokens):
"""调用 LLM 流式 API(示例实现)"""
# 实际项目中使用:
# response = await openai.ChatCompletion.acreate(
# model=model,
# messages=messages,
# temperature=temperature,
# max_tokens=max_tokens,
# stream=True
# )
# async for chunk in response:
# delta = chunk.choices[0].delta
# if delta.content:
# yield {"content": delta.content, "token_count": 1}
pass
# FastAPI 集成示例
"""
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat/stream")
async def stream_chat(request: ChatRequest):
service = LLMStreamingService(llm_client)
return StreamingResponse(
service.stream_chat(request.messages),
media_type="text/event-stream"
)
"""Token 计数与成本控制
Token 计费管理
"""
Token 计数与成本控制
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import time
@dataclass
class TokenUsage:
"""Token 使用记录"""
model: str
input_tokens: int
output_tokens: int
cost_usd: float
timestamp: datetime = field(default_factory=datetime.now)
request_id: str = ""
user_id: str = ""
application: str = ""
class TokenCounter:
"""Token 计数器"""
# 模型定价(USD per 1K tokens,2024 年参考价格)
MODEL_PRICING = {
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-4-turbo": {"input": 0.01, "output": 0.03},
"gpt-4o": {"input": 0.005, "output": 0.015},
"gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"claude-3-opus": {"input": 0.015, "output": 0.075},
"claude-3-sonnet": {"input": 0.003, "output": 0.015},
"claude-3-haiku": {"input": 0.00025, "output": 0.00125},
}
def __init__(self):
self._usage_log: List[TokenUsage] = []
def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
"""计算单次请求成本"""
pricing = self.MODEL_PRICING.get(model)
if not pricing:
return 0.0
input_cost = (input_tokens / 1000) * pricing["input"]
output_cost = (output_tokens / 1000) * pricing["output"]
return round(input_cost + output_cost, 6)
def record_usage(
self,
model: str,
input_tokens: int,
output_tokens: int,
user_id: str = "",
application: str = ""
) -> TokenUsage:
"""记录使用"""
usage = TokenUsage(
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost_usd=self.calculate_cost(model, input_tokens, output_tokens),
user_id=user_id,
application=application
)
self._usage_log.append(usage)
return usage
def get_daily_cost(self, date: datetime = None) -> Dict:
"""获取每日成本统计"""
target_date = date or datetime.now()
day_usages = [u for u in self._usage_log
if u.timestamp.date() == target_date.date()]
total_cost = sum(u.cost_usd for u in day_usages)
total_input = sum(u.input_tokens for u in day_usages)
total_output = sum(u.output_tokens for u in day_usages)
by_model = {}
for u in day_usages:
if u.model not in by_model:
by_model[u.model] = {"cost": 0, "requests": 0, "tokens": 0}
by_model[u.model]["cost"] += u.cost_usd
by_model[u.model]["requests"] += 1
by_model[u.model]["tokens"] += u.input_tokens + u.output_tokens
return {
"date": target_date.strftime("%Y-%m-%d"),
"total_cost_usd": round(total_cost, 4),
"total_requests": len(day_usages),
"total_input_tokens": total_input,
"total_output_tokens": total_output,
"by_model": by_model
}
class CostController:
"""成本控制器"""
def __init__(
self,
daily_budget_usd: float = 10.0,
per_user_daily_limit: float = 1.0,
alert_threshold: float = 0.8
):
self.daily_budget = daily_budget_usd
self.per_user_limit = per_user_daily_limit
self.alert_threshold = alert_threshold
self.counter = TokenCounter()
def check_budget(self, user_id: str = "") -> dict:
"""检查预算"""
daily_stats = self.counter.get_daily_cost()
remaining = self.daily_budget - daily_stats["total_cost_usd"]
usage_ratio = daily_stats["total_cost_usd"] / self.daily_budget
return {
"allowed": remaining > 0 and usage_ratio < 1.0,
"daily_budget": self.daily_budget,
"used_today": daily_stats["total_cost_usd"],
"remaining": round(remaining, 4),
"usage_ratio": round(usage_ratio, 4),
"should_alert": usage_ratio >= self.alert_threshold
}
def select_model(self, task_complexity: str = "medium") -> str:
"""根据任务复杂度选择模型(成本优化)"""
model_selection = {
"simple": "gpt-4o-mini", # 简单任务用最便宜的
"medium": "gpt-4o", # 中等任务用标准模型
"complex": "gpt-4-turbo", # 复杂任务用高性能模型
"critical": "gpt-4" # 关键任务用最强模型
}
return model_selection.get(task_complexity, "gpt-4o")
# 使用示例
counter = TokenCounter()
usage = counter.record_usage("gpt-4o", input_tokens=500, output_tokens=200, user_id="user123")
print(f"成本:${usage.cost_usd:.6f}")
controller = CostController(daily_budget_usd=50.0)
budget = controller.check_budget()
print(f"预算状态:{'允许' if budget['allowed'] else '超额'},已用 ${budget['used_today']}")模型路由
智能模型选择
"""
模型路由 — 根据任务特征选择最合适的模型
"""
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
import re
class TaskComplexity(Enum):
SIMPLE = "simple" # 分类、提取、简单问答
MEDIUM = "medium" # 摘要、翻译、改写
COMPLEX = "complex" # 推理、分析、创作
CRITICAL = "critical" # 代码生成、数学推理、专业领域
@dataclass
class ModelConfig:
"""模型配置"""
model_id: str
provider: str
max_tokens: int
cost_per_1k_input: float
cost_per_1k_output: float
latency_ms: int
quality_score: float # 1-10
capabilities: List[str]
class ModelRouter:
"""模型路由器 — 智能选择最优模型"""
def __init__(self):
self.models: Dict[str, ModelConfig] = {}
self._register_default_models()
def _register_default_models(self):
"""注册默认模型"""
default_models = [
ModelConfig("gpt-4o-mini", "openai", 128000, 0.00015, 0.0006, 200, 7.0,
["chat", "classification", "extraction"]),
ModelConfig("gpt-4o", "openai", 128000, 0.005, 0.015, 500, 9.0,
["chat", "reasoning", "code", "analysis"]),
ModelConfig("gpt-4-turbo", "openai", 128000, 0.01, 0.03, 800, 9.5,
["chat", "reasoning", "code", "math", "creative"]),
ModelConfig("claude-3-haiku", "anthropic", 200000, 0.00025, 0.00125, 150, 7.5,
["chat", "fast_response"]),
ModelConfig("claude-3-sonnet", "anthropic", 200000, 0.003, 0.015, 600, 9.0,
["chat", "reasoning", "code", "analysis"]),
]
for model in default_models:
self.models[model.model_id] = model
def route(
self,
prompt: str,
task_type: str = "chat",
max_cost: float = None,
max_latency_ms: int = None,
min_quality: float = None,
prefer_cheaper: bool = True
) -> ModelConfig:
"""路由到最合适的模型"""
# 1. 估算任务复杂度
complexity = self._estimate_complexity(prompt)
# 2. 筛选符合条件的模型
candidates = []
for model in self.models.values():
if task_type not in model.capabilities and "chat" not in model.capabilities:
continue
if max_cost and model.cost_per_1k_input > max_cost:
continue
if max_latency_ms and model.latency_ms > max_latency_ms:
continue
if min_quality and model.quality_score < min_quality:
continue
candidates.append(model)
if not candidates:
# 降级到最便宜的模型
return self.models.get("gpt-4o-mini")
# 3. 根据复杂度和偏好选择
if complexity == TaskComplexity.SIMPLE:
# 简单任务选最便宜的
candidates.sort(key=lambda m: m.cost_per_1k_input)
return candidates[0]
elif complexity == TaskComplexity.CRITICAL:
# 关键任务选质量最高的
candidates.sort(key=lambda m: m.quality_score, reverse=True)
return candidates[0]
else:
# 中等任务:在成本和质量之间平衡
if prefer_cheaper:
candidates.sort(key=lambda m: m.cost_per_1k_input * 10 / m.quality_score)
else:
candidates.sort(key=lambda m: m.quality_score, reverse=True)
return candidates[0]
def _estimate_complexity(self, prompt: str) -> TaskComplexity:
"""估算任务复杂度"""
complex_keywords = ["分析", "推理", "比较", "评估", "论证", "why", "analyze", "reason"]
critical_keywords = ["代码", "编程", "数学", "计算", "code", "math", "calculate"]
prompt_lower = prompt.lower()
for kw in critical_keywords:
if kw in prompt_lower:
return TaskComplexity.CRITICAL
for kw in complex_keywords:
if kw in prompt_lower:
return TaskComplexity.COMPLEX
if len(prompt) > 1000:
return TaskComplexity.MEDIUM
return TaskComplexity.SIMPLE
# 使用示例
router = ModelRouter()
# 简单问答 -> 选择便宜模型
model = router.route("今天天气怎么样?")
print(f"简单问答 -> {model.model_id} (成本: ${model.cost_per_1k_input}/1K)")
# 复杂分析 -> 选择强模型
model = router.route("请分析这篇论文的核心论点,并给出你的评价")
print(f"复杂分析 -> {model.model_id} (成本: ${model.cost_per_1k_input}/1K)")
# 代码生成 -> 选择最强模型
model = router.route("用 Python 实现一个高效的 LRU 缓存", task_type="code")
print(f"代码生成 -> {model.model_id} (质量: {model.quality_score})")语义缓存
相似问题缓存
"""
语义缓存 — 缓存语义相似的请求结果
"""
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
import hashlib
import json
@dataclass
class CacheEntry:
"""缓存条目"""
query: str
query_embedding: List[float]
response: str
model: str
created_at: datetime
hit_count: int = 0
ttl: timedelta = timedelta(hours=24)
@property
def is_expired(self) -> bool:
return datetime.now() - self.created_at > self.ttl
class SemanticCache:
"""语义缓存 — 基于语义相似度的缓存"""
def __init__(
self,
similarity_threshold: float = 0.92,
default_ttl: timedelta = timedelta(hours=24),
max_entries: int = 10000
):
self.similarity_threshold = similarity_threshold
self.default_ttl = default_ttl
self.max_entries = max_entries
self._cache: Dict[str, CacheEntry] = {}
def get(self, query_embedding: List[float]) -> Optional[CacheEntry]:
"""查找语义相似的缓存"""
best_match: Optional[CacheEntry] = None
best_score = 0.0
for entry in self._cache.values():
if entry.is_expired:
continue
similarity = self._cosine_similarity(query_embedding, entry.query_embedding)
if similarity > self.similarity_threshold and similarity > best_score:
best_score = similarity
best_match = entry
if best_match:
best_match.hit_count += 1
return best_match
return None
def set(
self,
query: str,
query_embedding: List[float],
response: str,
model: str,
ttl: timedelta = None
):
"""存储缓存"""
# 清理过期条目
self._cleanup()
# 如果超过最大条目数,移除最旧的
if len(self._cache) >= self.max_entries:
oldest_key = min(self._cache, key=lambda k: self._cache[k].created_at)
del self._cache[oldest_key]
cache_key = self._generate_key(query)
self._cache[cache_key] = CacheEntry(
query=query,
query_embedding=query_embedding,
response=response,
model=model,
created_at=datetime.now(),
ttl=ttl or self.default_ttl
)
def _cleanup(self):
"""清理过期条目"""
expired_keys = [k for k, v in self._cache.items() if v.is_expired]
for key in expired_keys:
del self._cache[key]
def get_stats(self) -> dict:
"""获取缓存统计"""
active = sum(1 for v in self._cache.values() if not v.is_expired)
total_hits = sum(v.hit_count for v in self._cache.values())
return {
"total_entries": len(self._cache),
"active_entries": active,
"total_hits": total_hits,
"avg_hits_per_entry": total_hits / max(len(self._cache), 1)
}
@staticmethod
def _cosine_similarity(a: List[float], b: List[float]) -> float:
dot = sum(x * y for x, y in zip(a, b))
norm_a = sum(x ** 2 for x in a) ** 0.5
norm_b = sum(x ** 2 for x in b) ** 0.5
return dot / (norm_a * norm_b + 1e-8)
@staticmethod
def _generate_key(query: str) -> str:
return hashlib.md5(query.encode()).hexdigest()评估指标
LLM 输出质量评估
"""
LLM 应用评估指标
"""
from typing import List, Dict
from dataclasses import dataclass
import math
@dataclass
class EvaluationResult:
"""评估结果"""
metric_name: str
score: float # 0-1
details: str
passed: bool
class LLMEvaluator:
"""LLM 输出评估器"""
@staticmethod
def evaluate_relevance(question: str, answer: str, context: str = "") -> EvaluationResult:
"""评估回答的相关性(基于关键词重叠)"""
question_words = set(question.lower().split())
answer_words = set(answer.lower().split())
# 问题关键词在回答中出现的比例
overlap = question_words & answer_words
score = len(overlap) / max(len(question_words), 1)
return EvaluationResult(
metric_name="relevance",
score=min(score * 2, 1.0), # 标准化到 0-1
details=f"关键词重叠率:{score:.2%},重叠词:{overlap}",
passed=score > 0.2
)
@staticmethod
def evaluate_completeness(question: str, answer: str) -> EvaluationResult:
"""评估回答的完整性"""
answer_len = len(answer)
# 简化的完整性评估:基于回答长度和结构
has_structure = any(marker in answer for marker in ["1.", "首先", "其次", "因此", "总之"])
has_adequate_length = answer_len > 50
score = 0.0
if has_adequate_length:
score += 0.5
if has_structure:
score += 0.3
if answer_len > 200:
score += 0.2
return EvaluationResult(
metric_name="completeness",
score=min(score, 1.0),
details=f"回答长度:{answer_len}字,有结构:{has_structure}",
passed=score >= 0.5
)
@staticmethod
def evaluate_hallucination(answer: str, context: str) -> EvaluationResult:
"""评估幻觉(回答与上下文的一致性)"""
if not context:
return EvaluationResult(
metric_name="hallucination",
score=0.5,
details="无上下文参考,无法评估幻觉",
passed=True
)
# 简化:检查回答中的关键事实是否在上下文中出现
answer_sentences = answer.replace("。", ".").replace("!", "!").split(".")
context_lower = context.lower()
supported = 0
total = len([s for s in answer_sentences if len(s.strip()) > 5])
for sentence in answer_sentences:
if len(sentence.strip()) <= 5:
continue
# 检查句子中的关键词是否在上下文中
words = [w for w in sentence.split() if len(w) > 2]
if any(w.lower() in context_lower for w in words):
supported += 1
score = supported / max(total, 1)
return EvaluationResult(
metric_name="hallucination",
score=score,
details=f"支持率:{score:.2%}({supported}/{total} 句有上下文支持)",
passed=score >= 0.6
)
def evaluate_all(self, question: str, answer: str, context: str = "") -> Dict:
"""全面评估"""
results = [
self.evaluate_relevance(question, answer, context),
self.evaluate_completeness(question, answer),
self.evaluate_hallucination(answer, context)
]
overall_score = sum(r.score for r in results) / len(results)
return {
"overall_score": round(overall_score, 3),
"passed": all(r.passed for r in results),
"metrics": [
{"name": r.metric_name, "score": r.score, "passed": r.passed, "details": r.details}
for r in results
]
}
# 使用示例
evaluator = LLMEvaluator()
result = evaluator.evaluate_all(
question="什么是机器学习?",
answer="机器学习是人工智能的一个子领域,它使计算机能够从数据中学习而无需显式编程。"
"机器学习主要包括监督学习、无监督学习和强化学习三种类型。",
context="机器学习是AI的重要分支..."
)
print(f"总评分:{result['overall_score']},通过:{result['passed']}")降级策略
多层降级兜底
"""
降级策略 — LLM 服务不可用时的兜底方案
"""
from typing import Optional, List
from dataclasses import dataclass
import time
@dataclass
class FallbackConfig:
"""降级配置"""
primary_model: str
fallback_models: List[str]
max_retries: int = 2
retry_delay_seconds: float = 1.0
timeout_seconds: float = 30.0
cache_fallback: bool = True
class LLMServiceWithFallback:
"""带降级策略的 LLM 服务"""
def __init__(self, llm_clients: dict, cache: SemanticCache, config: FallbackConfig):
self.llm_clients = llm_clients
self.cache = cache
self.config = config
self.circuit_breaker = {} # 简单熔断器
async def chat(self, messages: list, model: str = None) -> dict:
"""带降级的聊天请求"""
models_to_try = [model or self.config.primary_model] + self.config.fallback_models
for current_model in models_to_try:
# 检查熔断器
if self._is_circuit_open(current_model):
continue
# 检查缓存
if self.config.cache_fallback:
# 实际项目中用 query embedding 查缓存
pass
# 尝试调用
for attempt in range(self.config.max_retries):
try:
client = self.llm_clients.get(current_model)
if not client:
continue
response = await self._call_with_timeout(
client, messages, current_model
)
# 成功,重置熔断器
self._reset_circuit(current_model)
return response
except Exception as e:
print(f"模型 {current_model} 第 {attempt+1} 次尝试失败: {e}")
if attempt == self.config.max_retries - 1:
self._trip_circuit(current_model)
time.sleep(self.config.retry_delay_seconds)
# 所有模型都失败,返回兜底响应
return {
"response": "抱歉,服务暂时不可用,请稍后重试。",
"model": "fallback",
"fallback": True
}
async def _call_with_timeout(self, client, messages, model):
"""带超时的调用"""
# 实际实现中用 asyncio.wait_for
return await client.chat(messages, model=model)
def _is_circuit_open(self, model: str) -> bool:
"""检查熔断器是否打开"""
state = self.circuit_breaker.get(model)
if not state:
return False
if state["open"] and time.time() - state["tripped_at"] > 60:
# 半开状态,允许一次尝试
return False
return state.get("open", False)
def _trip_circuit(self, model: str):
"""触发熔断"""
self.circuit_breaker[model] = {
"open": True,
"tripped_at": time.time(),
"failure_count": self.circuit_breaker.get(model, {}).get("failure_count", 0) + 1
}
def _reset_circuit(self, model: str):
"""重置熔断器"""
self.circuit_breaker[model] = {"open": False, "failure_count": 0}架构总览
┌─────────────────────────────────────────────────┐
│ 客户端 / 前端 │
│ (Web App / Mobile / CLI / API) │
└──────────────────┬──────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────┐
│ API Gateway / BFF │
│ ┌──────────┐ ┌──────────┐ ┌──────────────────┐ │
│ │ 认证授权 │ │ 限流控制 │ │ 请求日志 │ │
│ └──────────┘ └──────────┘ └──────────────────┘ │
└──────────────────┬──────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────┐
│ LLM 应用服务层 │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ 提示词管理 │ │ 对话记忆 │ │
│ └──────────────┘ └──────────────┘ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ 模型路由 │ │ 语义缓存 │ │
│ └──────────────┘ └──────────────┘ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Token 计费 │ │ 降级策略 │ │
│ └──────────────┘ └──────────────┘ │
└──────────────────┬──────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────┐
│ RAG 管道 │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ 文档分块 │ │ 向量检索 │ │ 重排序 │ │
│ └────────┘ └────────┘ └────────┘ │
└──────────────────┬──────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────┐
│ LLM Provider 层 │
│ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ OpenAI │ │ Claude │ │ Gemini │ │ 本地模型 │ │
│ └────────┘ └────────┘ └────────┘ └────────┘ │
└──────────────────────────────────────────────────┘优点
缺点
总结
LLM 应用架构的核心是在不确定性的基础上构建确定性。通过模块化设计(提示词管理、记忆管理、模型路由、缓存、降级),我们可以在 LLM 概率性输出的基础上构建稳定可靠的应用。关键原则:能用便宜模型就不选贵的、能缓存就不重新生成、能降级就不报错。
关键知识点
- 提示词模板管理确保一致性和可追溯性
- 上下文窗口优化是 RAG 的核心挑战
- 对话记忆需要在信息量和 Token 成本之间平衡
- 模型路由根据任务复杂度和成本预算选择模型
- 语义缓存可以大幅降低重复查询的成本
项目落地视角
- 从简单架构开始,逐步引入 RAG、缓存、路由等组件
- Token 成本是持续支出,第一天就要建立监控
- 评估体系要在上线前建立,否则无法衡量质量
- 降级策略是生产环境的必备保障
- 日志和追踪帮助定位用户反馈的问题
常见误区
- 过度工程化,上来就搭全套 RAG 管道
- 忽略 Token 成本,月底收到巨额账单
- 不做评估,无法量化回答质量
- 缓存相似度阈值设太高,命中率低
- 对话记忆无限增长,超出上下文窗口
- 不做降级,LLM API 挂掉时整个服务不可用
进阶路线
- 学习 LlamaIndex / LangChain 的架构模式
- 研究多 Agent 协作架构(AutoGen、CrewAI)
- 探索 Fine-tuning 与 RAG 的结合策略
- 学习 RLHF 和 DPO 等对齐技术
- 了解 LLM 应用的安全防护(越狱、注入)
适用场景
- 企业知识库问答系统
- 智能客服和对话机器人
- 文档分析和信息提取
- 代码辅助和开发工具
- 内容生成和创意辅助
落地建议
- 第一阶段:API 调用 + 提示词模板 + 基本监控
- 第二阶段:RAG + 对话记忆 + 流式响应
- 第三阶段:模型路由 + 语义缓存 + 成本优化
- 第四阶段:评估体系 + 多 Agent + 自动化测试
- 每个阶段都要有明确的指标和验收标准
排错清单
- 回答质量差:检查提示词、上下文、模型选择
- 响应超时:检查模型路由、降级策略、超时配置
- 成本超标:检查缓存命中率、模型使用分布、Token 计数
- 幻觉严重:检查 RAG 检索质量、上下文相关性
- 记忆丢失:检查对话记忆压缩策略、窗口大小
复盘问题
- LLM 应用的核心质量指标是什么?如何量化?
- 每次请求的平均成本是多少?是否有优化空间?
- 降级策略是否在实际故障中验证过?
- 语义缓存的命中率是多少?相似度阈值是否合理?
- 用户对回答质量的满意度如何?有哪些高频投诉?
