RAG 进阶实践
大约 10 分钟约 3122 字
RAG 进阶实践
简介
RAG(Retrieval-Augmented Generation)进入生产阶段后,难点通常不再是"能不能检索到文档",而是"能否稳定检索到对的内容,并让模型基于这些内容给出可验证答案"。进阶 RAG 关注的是切分策略、召回排序、混合检索、上下文压缩、评估体系和线上可观测性,这些因素往往比单纯更换大模型更影响最终效果。
RAG 的核心价值在于解决大语言模型的三个根本问题:知识时效性(模型训练数据的截止日期)、幻觉问题(生成看似正确但实际错误的信息)和领域专业性(通用模型在垂直领域知识不足)。一个设计良好的 RAG 系统应该让用户能够追溯答案来源、验证答案正确性,并且在检索失败时有明确的兜底策略。
特点
实现
文档切分与索引构建
from langchain_text_splitters import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=80,
separators=["\n## ", "\n### ", "\n", "。", " ", ""]
)
embedding_model = SentenceTransformer("BAAI/bge-small-zh-v1.5")
client = QdrantClient(host="127.0.0.1", port=6333)
collection_name = "knowledge_base"
client.recreate_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=512, distance=Distance.COSINE)
)
documents = [
{"id": 1, "title": "退款规则", "content": "订单签收后 7 天内可申请退款,虚拟商品除外。"},
{"id": 2, "title": "发票说明", "content": "企业订单支持增值税专票,个人订单默认电子发票。"}
]
points = []
point_id = 1
for doc in documents:
chunks = splitter.split_text(doc["content"])
embeddings = embedding_model.encode(chunks, normalize_embeddings=True)
for chunk, vector in zip(chunks, embeddings):
points.append(PointStruct(
id=point_id,
vector=vector.tolist(),
payload={
"doc_id": doc["id"],
"title": doc["title"],
"chunk": chunk
}
))
point_id += 1
client.upsert(collection_name=collection_name, points=points)
print(f"indexed chunks: {len(points)}")不同切分策略对比
def compare_chunk_strategies():
"""
不同文档类型适合不同的切分策略
"""
strategies = {
"递归字符切分": {
"适用": "通用文档、Markdown、技术文档",
"chunk_size": "300-800 字符",
"overlap": "50-150 字符",
"优点": "保留段落和标题边界",
"缺点": "可能切断语义相关的句子"
},
"句子级切分": {
"适用": "FAQ、问答对、短文本",
"chunk_size": "1-5 个句子",
"overlap": "1 个句子",
"优点": "语义完整性最好",
"缺点": "chunk 太短,上下文不足"
},
"语义切分": {
"适用": "长文档、法律文书",
"chunk_size": "根据语义边界自动确定",
"overlap": "1-2 个语义段落",
"优点": "不会切断语义",
"缺点": "计算成本高,需要额外模型"
},
"按标题切分": {
"适用": "Markdown、技术文档",
"chunk_size": "按标题层级切分",
"overlap": "包含父标题上下文",
"优点": "保留文档结构信息",
"缺点": "需要文档有清晰的标题结构"
}
}
for name, info in strategies.items():
print(f"\n{name}:")
for key, value in info.items():
print(f" {key}: {value}")
compare_chunk_strategies()混合检索与 Rerank
from rank_bm25 import BM25Okapi
import jieba
corpus = [p.payload["chunk"] for p in points]
tokenized_corpus = [list(jieba.cut(text)) for text in corpus]
bm25 = BM25Okapi(tokenized_corpus)
query = "退款后发票怎么处理"
query_tokens = list(jieba.cut(query))
keyword_scores = bm25.get_scores(query_tokens)
vector_hits = client.search(
collection_name=collection_name,
query_vector=embedding_model.encode(query, normalize_embeddings=True).tolist(),
limit=5
)
keyword_topk = sorted(enumerate(keyword_scores), key=lambda x: x[1], reverse=True)[:5]
print("vector hits:", [hit.payload["chunk"] for hit in vector_hits])
print("keyword hits:", [corpus[idx] for idx, _ in keyword_topk])Reciprocal Rank Fusion (RRF)
def reciprocal_rank_fusion(results_list, weights=None):
"""
RRF(倒数排名融合):
一种简单有效的多路召回融合方法
score = Σ w_i / (k + rank_i),通常 k=60
"""
k = 60
if weights is None:
weights = [1.0] * len(results_list)
fused = {}
for results, weight in zip(results_list, weights):
for rank, item in enumerate(results):
if isinstance(item, tuple):
doc_id = item[0]
else:
doc_id = item
if doc_id not in fused:
fused[doc_id] = 0
fused[doc_id] += weight / (k + rank + 1)
return sorted(fused.items(), key=lambda x: x[1], reverse=True)
# 演示 RRF 融合
vector_results = [(1, 0.95), (3, 0.88), (5, 0.82), (2, 0.75)]
bm25_results = [(2, 3.2), (1, 2.8), (4, 2.1), (5, 1.5)]
fused = reciprocal_rank_fusion([vector_results, bm25_results], weights=[1.0, 0.8])
print("RRF 融合结果:", [(doc_id, round(score, 4)) for doc_id, score in fused])Query Rewrite 与 HyDE
def query_rewrite(query: str, llm) -> str:
"""
Query Rewrite(查询重写):
用 LLM 将用户的原始问题改写为更适合检索的形式
"""
prompt = f"""请将以下用户问题改写为一个更适合在知识库中检索的查询。
保留核心意图,去掉口语化表达,补充可能的同义词。
原始问题:{query}
改写后的查询:"""
return llm.generate(prompt)
def hyde_retrieval(query: str, llm, retriever, k=3):
"""
HyDE(Hypothetical Document Embedding):
先让 LLM 生成一个假设性回答,然后用这个回答的嵌入去检索
适用于查询和文档语义空间不一致的场景
"""
prompt = f"请根据以下问题,写一段可能的答案(即使不确定也请写):\n{query}"
hypothetical_answer = llm.generate(prompt)
# 用假设性回答的嵌入进行检索
query_embedding = embed(hypothetical_answer)
results = retriever.search(query_embedding, k=k)
return results
print("Query Rewrite 和 HyDE 策略已定义")上下文压缩与答案生成
# 组装上下文时做压缩,避免把无关 chunk 全塞给模型
max_context_chars = 1200
selected_chunks = []
current_length = 0
for text, _ in reranked:
if current_length + len(text) > max_context_chars:
break
selected_chunks.append(text)
current_length += len(text)
context = "\n\n".join(selected_chunks)
prompt = f"""
你是企业知识库助手。请严格基于给定资料回答,不要编造。
如果资料不足,请明确说明"资料未覆盖"。
问题:{query}
资料:
{context}
"""
print(prompt)RAG 评估体系
# 检索评估:Recall@K / MRR
qa_pairs = [
{"question": "退款后发票怎么处理", "gold_doc_id": 2},
{"question": "订单签收几天内可退款", "gold_doc_id": 1}
]
hits = 0
mrr_total = 0.0
for item in qa_pairs:
search_hits = client.search(
collection_name=collection_name,
query_vector=embedding_model.encode(item["question"], normalize_embeddings=True).tolist(),
limit=5
)
found_rank = None
for rank, hit in enumerate(search_hits, start=1):
if hit.payload["doc_id"] == item["gold_doc_id"]:
found_rank = rank
break
if found_rank is not None:
hits += 1
mrr_total += 1 / found_rank
recall_at_5 = hits / len(qa_pairs)
mrr = mrr_total / len(qa_pairs)
print({"recall@5": recall_at_5, "mrr": round(mrr, 4)})RAG 三层评估框架
class RAGEvaluator:
"""
RAG 系统三层评估:
1. 检索质量:Recall@K, MRR, NDCG
2. 上下文质量:相关性、完整性、噪声比例
3. 答案质量:正确性、引用性、幻觉检测
"""
def evaluate(self, qa_pairs, retriever, generator):
results = []
for qa in qa_pairs:
# 检索
retrieved_docs = retriever.search(qa["question"], k=5)
retrieval_hit = qa["gold_doc_id"] in [d["doc_id"] for d in retrieved_docs]
# 生成
context = "\n".join([d["chunk"] for d in retrieved_docs[:3]])
answer = generator.generate(qa["question"], context)
results.append({
"question": qa["question"],
"retrieval_hit": retrieval_hit,
"has_citation": "[" in answer,
"gold_answer": qa.get("gold_answer", ""),
"generated_answer": answer,
})
return results
def report(self, results):
retrieval_hit_rate = sum(r["retrieval_hit"] for r in results) / len(results)
citation_rate = sum(r["has_citation"] for r in results) / len(results)
print(f"检索命中率: {retrieval_hit_rate:.2%}")
print(f"引用率: {citation_rate:.2%}")
return {"retrieval_hit_rate": retrieval_hit_rate, "citation_rate": citation_rate}
print("RAG 三层评估框架已定义")Parent-Child Retrieval
def parent_child_retrieval(documents, child_chunk_size=200, parent_chunk_size=800):
"""
Parent-Child Retrieval(父子块检索):
- 将文档切分为较大的父块(用于上下文)
- 将每个父块再切分为较小的子块(用于精确检索)
- 检索时在子块上匹配,返回对应的父块作为上下文
优势:既保证了检索精度,又保证了上下文完整性
"""
parent_chunks = []
child_chunks = []
child_to_parent = {}
for doc in documents:
# 先切分为父块
parents = split_text(doc, parent_chunk_size)
for parent_idx, parent in enumerate(parents):
parent_id = f"parent_{len(parent_chunks)}"
parent_chunks.append({"id": parent_id, "text": parent})
# 再将父块切分为子块
children = split_text(parent, child_chunk_size)
for child in children:
child_id = f"child_{len(child_chunks)}"
child_chunks.append({"id": child_id, "text": child})
child_to_parent[child_id] = parent_id
print(f"父块数: {len(parent_chunks)}, 子块数: {len(child_chunks)}")
print(f"子块到父块的映射关系数: {len(child_to_parent)}")
return parent_chunks, child_chunks, child_to_parent
def split_text(text, chunk_size):
"""简化的文本切分"""
chunks = []
for i in range(0, len(text), chunk_size):
chunks.append(text[i:i+chunk_size])
return chunks
print("Parent-Child Retrieval 策略已定义")Multi-Query Retrieval
def multi_query_retrieval(query, llm, retriever, num_queries=3):
"""
Multi-Query Retrieval(多查询检索):
用 LLM 从不同角度改写用户查询,
对每个改写后的查询分别检索,
最后合并去重结果
"""
prompt = f"""请从不同角度改写以下查询,生成 {num_queries} 个不同的检索查询。
每个查询应该从不同侧面表达相同的信息需求。
原始查询:{query}
请输出 {num_queries} 个改写后的查询,每行一个:"""
rewritten_queries = llm.generate(prompt).strip().split('\n')
all_results = {}
for q in rewritten_queries[:num_queries]:
results = retriever.search(q, k=3)
for doc in results:
doc_id = doc["id"]
if doc_id not in all_results or doc["score"] > all_results[doc_id]["score"]:
all_results[doc_id] = doc
final_results = sorted(all_results.values(), key=lambda x: x["score"], reverse=True)
print(f"原始查询: {query}")
print(f"改写查询: {rewritten_queries}")
print(f"合并结果数: {len(final_results)}")
return final_results[:5]
print("Multi-Query Retrieval 策略已定义")线上观测与可回溯性
# 线上观测建议
trace_log = {
"query": query,
"retrieved_doc_ids": [hit.payload["doc_id"] for hit in vector_hits],
"reranked_context": selected_chunks,
"answer_model": "claude/other-llm",
"latency_ms": 842,
"token_in": 1250,
"token_out": 210
}
print(trace_log)RAG 成本优化
class RAGCostTracker:
"""RAG 系统成本追踪"""
def __init__(self):
self.embedding_cost = 0
self.rerank_cost = 0
self.generation_cost = 0
self.total_queries = 0
def track_query(self, embedding_tokens, rerank_tokens, prompt_tokens, completion_tokens):
"""记录一次查询的成本"""
# 假设定价(每百万 token)
self.embedding_cost += embedding_tokens / 1_000_000 * 0.1 # $0.1/1M
self.rerank_cost += rerank_tokens / 1_000_000 * 1.0 # $1.0/1M
self.generation_cost += (prompt_tokens + completion_tokens) / 1_000_000 * 5.0 # $5.0/1M
self.total_queries += 1
def summary(self):
total = self.embedding_cost + self.rerank_cost + self.generation_cost
return {
"total_queries": self.total_queries,
"embedding_cost": f"${self.embedding_cost:.4f}",
"rerank_cost": f"${self.rerank_cost:.4f}",
"generation_cost": f"${self.generation_cost:.4f}",
"total_cost": f"${total:.4f}",
"avg_cost_per_query": f"${total / max(1, self.total_queries):.4f}"
}
tracker = RAGCostTracker()
tracker.track_query(500, 300, 1200, 200)
tracker.track_query(500, 300, 800, 150)
print(tracker.summary())优点
缺点
总结
RAG 的进阶优化重点在"召回是否准、上下文是否干净、答案是否可验证"。实践中通常先把切分、混合检索、rerank 和评估集搭起来,再逐步调整模型与 Prompt;否则只换大模型,常常治标不治本。进阶 RAG 不是一味追求更复杂的架构,而是在每个环节都做到"足够好":切分保持语义完整、检索覆盖多种匹配模式、排序过滤噪声、生成基于证据、评估覆盖全链路。
关键知识点
- chunk 粒度直接影响召回质量和上下文噪声。
- 仅用向量检索时,关键词精确匹配类问题常常会漏召回。
- rerank 往往比单纯增大 topK 更有效。
- 评估要拆成检索评估和生成评估两层。
项目落地视角
- 企业知识库问答要优先解决文档切分和更新同步问题。
- 客服 FAQ 场景可用 BM25 + 向量混合检索提升命中率。
- 合规/制度问答应强制引用片段,避免自由发挥。
- 线上必须记录 query、召回片段、答案和反馈,便于持续优化。
常见误区
- 认为换更大的 LLM 就能自动修复差检索。
- chunk 切得过大,导致模型拿到大量无关内容。
- 没有 gold set,只靠主观感觉判断 RAG 好坏。
- 检索链路出错时,一味改 Prompt 或模型参数。
进阶路线
- 学习 HyDE、Query Rewrite、Multi-Query Retrieval。
- 引入 Parent-Child Retrieval、Contextual Compression。
- 建立自动化 RAG Eval 流水线。
- 研究 Graph RAG、Agentic RAG 等更复杂方案。
适用场景
- 企业内部知识库问答。
- 政策、合同、制度、帮助中心检索问答。
- 客服辅助、工单问答、研发文档搜索。
- 多文档、多来源、需要可追溯答案的场景。
落地建议
- 先固定一套评估问题集,再优化检索链路。
- 文档入库时保留来源、标题、段落位置等元数据。
- 对高风险回答增加引用和"资料不足"兜底逻辑。
- 把召回、rerank、生成分层记录日志,便于定位瓶颈。
排错清单
- 检查 chunk 是否过大/过小,是否破坏语义边界。
- 检查 embedding 模型是否适合当前语言与领域。
- 检查 topK、rerank 和上下文压缩是否丢掉关键信息。
- 检查错误答案到底是检索错、排序错还是生成错。
复盘问题
- 当前 RAG 的主要瓶颈是在召回、排序还是生成?
- 哪些查询更适合关键词检索,哪些更适合向量检索?
- 如果 Token 成本翻倍,效果是否真的明显提升?
- 线上日志是否足以支持你回放一次失败问答?
