LangChain 应用开发
大约 12 分钟约 3485 字
LangChain 应用开发
简介
LangChain 是一个面向大模型应用编排的框架,它把 Prompt、模型调用、检索、记忆、工具调用和链式执行组合在一起,让开发者更方便地搭建知识问答、RAG、Agent、总结归纳和工作流类 AI 应用。它的真正价值不在于"API 很多",而在于能把一系列 AI 组件组织成可测试、可扩展、可观察的执行链路。
LangChain 的核心设计哲学是"组合优于定制":将复杂的 AI 应用拆解为独立的、可复用的组件(Prompt 模板、模型接口、检索器、输出解析器等),然后通过链式组合将它们串联起来。这种设计使得开发者可以快速搭建原型,同时保持代码的可维护性和可测试性。但需要注意的是,LangChain 的 API 变化较快,理解核心抽象比死记具体 API 更重要。
特点
实现
基础模型调用与 Prompt 组合
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.2,
api_key="your-api-key"
)
response = llm.invoke([
SystemMessage(content="你是一个 .NET 技术顾问"),
HumanMessage(content="解释 ASP.NET Core 中间件管道的作用")
])
print(response.content)from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个{role},擅长{domain}领域。"),
("human", "{question}")
])
chain = prompt | llm
result = chain.invoke({
"role": "高级架构师",
"domain": "微服务架构",
"question": "如何设计服务间通信?"
})
print(result.content)LangChain 最核心的一点:
不是"调用模型",
而是把输入、上下文、检索、工具和输出解析组织成链。LCEL(LangChain Expression Language)
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
# LCEL 是 LangChain 的核心编排语言
# 使用管道符 | 连接组件,类似 Unix 管道
# 基础链:Prompt → Model → Parser
basic_chain = prompt | llm | StrOutputParser()
# 并行执行
parallel_chain = RunnableParallel(
summary=prompt | llm | StrOutputParser(),
keywords=prompt | llm | StrOutputParser()
)
# 带中间步骤的链
analysis_chain = (
{"text": RunnablePassthrough()}
| RunnableParallel(
sentiment=RunnablePassthrough() | llm,
keywords=RunnablePassthrough() | llm
)
)
print("LCEL 编排模式已定义")
print("核心思想:通过 | 管道符组合可运行组件")对话记忆与多轮上下文
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
store = {}
def get_session_history(session_id: str):
if session_id not in store:
store[session_id] = ChatMessageHistory()
return store[session_id]
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个企业助手,请记住用户上下文。"),
MessagesPlaceholder(variable_name="history"),
("human", "{question}")
])
chain = prompt | llm
chain_with_history = RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="question",
history_messages_key="history"
)
resp1 = chain_with_history.invoke(
{"question": "我叫张三,是后端开发工程师"},
config={"configurable": {"session_id": "u-1001"}}
)
resp2 = chain_with_history.invoke(
{"question": "我刚才说我是做什么的?"},
config={"configurable": {"session_id": "u-1001"}}
)
print(resp1.content)
print(resp2.content)记忆适合:
- 对话连续性
- 用户偏好保存
- 多轮任务上下文
但不等于长期知识库存储。持久化记忆(Redis/数据库)
from langchain_community.chat_message_histories import RedisChatMessageHistory
# 使用 Redis 存储对话历史(生产环境推荐)
def get_redis_history(session_id: str) -> RedisChatMessageHistory:
return RedisChatMessageHistory(
session_id=session_id,
url="redis://localhost:6379/0",
key_prefix="chat_history:"
)
# 对话历史管理策略
memory_strategies = {
"全部保留": "适合短对话,但上下文窗口有限制",
"滑动窗口": "保留最近 N 轮对话,平衡上下文和成本",
"摘要压缩": "用 LLM 定期压缩历史,保留关键信息",
"实体记忆": "只提取和保留关键实体信息,最省 Token",
}
for strategy, desc in memory_strategies.items():
print(f"{strategy}: {desc}")RAG:文档问答链路
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
loader = TextLoader("knowledge.txt", encoding="utf-8")
docs = loader.load()
splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50,
separators=["\n\n", "\n", "。", "!", "?", ".", " "]
)
chunks = splitter.split_documents(docs)
embeddings = OpenAIEmbeddings(api_key="your-api-key")
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory="./chroma_db"
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})rag_prompt = ChatPromptTemplate.from_template("""
根据以下上下文回答问题。如果上下文中没有答案,请明确说"资料未覆盖"。
上下文:
{context}
问题:{question}
""")
def format_docs(documents):
return "\n\n".join(doc.page_content for doc in documents)
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| rag_prompt
| llm
| StrOutputParser()
)
answer = rag_chain.invoke("什么是微服务架构?")
print(answer)RAG 链路的重点通常不在"模型换哪个",而在:
- chunk 怎么切
- embedding 是否合适
- topK 召回是否命中
- prompt 是否要求基于上下文回答多种文档加载器
# LangChain 支持多种文档格式
document_loaders = {
"TextLoader": "纯文本文件 (.txt)",
"PyPDFLoader": "PDF 文件",
"Docx2txtLoader": "Word 文档 (.docx)",
"UnstructuredHTMLLoader": "HTML 网页",
"CSVLoader": "CSV 表格",
"JSONLoader": "JSON 数据",
"WebBaseLoader": "网页爬取",
"YouTubeLoader": "YouTube 字幕",
}
for loader, desc in document_loaders.items():
print(f"{loader}: {desc}")Tool Use / Agent 接入思路
from langchain_core.tools import tool
@tool
def search_user(user_name: str) -> str:
"""查询用户信息,输入为用户名"""
fake_users = {
"张三": "用户ID=1001, 部门=研发部, 角色=高级工程师",
"李四": "用户ID=1002, 部门=产品部, 角色=产品经理"
}
return fake_users.get(user_name, f"未找到用户: {user_name}")
@tool
def calculator(expression: str) -> str:
"""执行简单数值计算"""
return str(eval(expression))# 这里不追求把 Agent API 写得最全,重点是理解:
# LangChain 可以把模型 + 工具 + 结构化输出 + 记忆拼成一套可运行链路结构化输出解析
from pydantic import BaseModel, Field
from langchain_core.output_parsers import JsonOutputParser
class CodeReview(BaseModel):
issues: list[str] = Field(description="发现的问题")
score: int = Field(description="质量评分 1-10")
summary: str = Field(description="总体结论")
parser = JsonOutputParser(pydantic_object=CodeReview)
prompt = ChatPromptTemplate.from_messages([
("system", "你是代码审查专家。请按指定 JSON 输出。{format_instructions}"),
("human", "请审查下面这段代码:\n{code}")
])
chain = prompt | llm | parser
# result = chain.invoke({
# "code": "public void Process() { ... }",
# "format_instructions": parser.get_format_instructions()
# })结构化输出比自由文本更适合:
- 前后端联动
- 自动化处理
- 结果校验
- 失败重试链路可观测性
from langchain.callbacks import get_openai_callback
# 使用回调追踪 Token 使用和成本
with get_openai_callback() as cb:
result = basic_chain.invoke({"role": "架构师", "domain": "分布式", "question": "CAP 定理"})
print(f"总 Token: {cb.total_tokens}")
print(f"Prompt Token: {cb.prompt_tokens}")
print(f"Completion Token: {cb.completion_tokens}")
print(f"总成本: ${cb.total_cost:.4f}")LangGraph 状态图编排
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
# LangGraph 是 LangChain 的新一代编排框架
# 用状态图替代简单的链式调用,支持循环、分支和人工介入
class RAGState(TypedDict):
"""RAG 流程的状态定义"""
question: str
documents: list
context: str
answer: str
retrieval_count: int
def retrieve(state: RAGState):
"""检索节点"""
docs = retriever.invoke(state["question"])
return {"documents": docs, "context": "\n".join([d.page_content for d in docs[:3]]), "retrieval_count": len(docs)}
def generate(state: RAGState):
"""生成节点"""
response = llm.invoke(rag_prompt.invoke({"question": state["question"], "context": state["context"]}))
return {"answer": response.content}
def should_retrieve(state: RAGState):
"""判断是否需要重新检索"""
if state["retrieval_count"] < 3:
return "retrieve"
return "generate"
# 构建状态图
graph = StateGraph(RAGState)
graph.add_node("retrieve", retrieve)
graph.add_node("generate", generate)
graph.add_conditional_edges("retrieve", should_retrieve, {"retrieve": "retrieve", "generate": "generate"})
graph.add_edge("generate", END)
app = graph.compile()
print("LangGraph 状态图编排已定义")
print("优势:支持循环、条件分支、人工审核等复杂流程")缓存与性能优化
from langchain.cache import InMemoryCache
from langchain.globals import set_llm_cache
# 启用 LLM 响应缓存(相同输入直接返回缓存结果)
set_llm_cache(InMemoryCache())
# 适用于:
# - 开发调试阶段减少 API 调用
# - 高频重复查询的在线服务
# - 评估集重复运行
# 注意:生产环境应使用 Redis 等外部缓存
print("LLM 缓存已启用")错误处理与重试
from langchain_core.runnables import RunnableWithFallbacks
from langchain_core.runnables import RunnableRetryable
# 自动重试(应对 API 限流或临时错误)
retryable_chain = basic_chain.with_retry(
stop_after_attempt=3,
retry_if_exception=lambda e: isinstance(e, Exception)
)
# 降级方案(主模型失败时切换到备用模型)
backup_llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
backup_chain = prompt | backup_llm | StrOutputParser()
fallback_chain = basic_chain.with_fallbacks([backup_chain])
print("重试和降级链已定义")优点
缺点
总结
LangChain 的真正价值,不是"帮你调模型",而是把 AI 应用的多个环节抽象成可组合组件。只要你把问题拆成输入、上下文、检索、工具、输出这几层,LangChain 就很适合作为应用编排框架;但如果只会"往上堆链",而不理解每层责任,很容易做出复杂却不稳定的系统。
关键知识点
- LangChain 最核心的是链式编排思想,而不只是某个 API 名字。
- Prompt、检索、输出解析器往往比模型参数更影响最终效果。
- 记忆不等于知识库,RAG 也不等于 Agent。
- 结构化输出是很多生产 AI 系统的关键稳定器。
- LCEL(管道符语法)是 LangChain 的核心编排方式。
项目落地视角
- 企业知识问答通常先从 LangChain + RAG 起步。
- 对话助手则更常配合记忆、工具调用和会话管理使用。
- 复杂系统里要把 Prompt、检索、输出解析和日志一起治理。
- LangChain 更适合快速试验和应用编排,不应替代系统级工程设计。
常见误区
- 认为用上 LangChain 就天然具备 Agent 或 RAG 能力。
- 把所有逻辑都塞进一条超长链,最后既慢又难调试。
- 只看模型输出,不分析检索和解析环节是否出错。
- Prompt 一变就感觉效果变化很大,但没有评估集做验证。
进阶路线
- 深入学习 LangGraph、Tool Calling、Agent 编排。
- 结合向量数据库、缓存和日志平台做更稳定的 RAG 系统。
- 为链路建立观测:召回结果、Prompt 版本、Token 消耗、失败样例。
- 根据场景决定是否从 LangChain 逐步演进到更定制化的自研编排。
适用场景
- RAG 知识问答。
- 企业内部 Copilot。
- 信息抽取与结构化输出。
- Prompt + 检索 + 工具联动型 AI 应用。
落地建议
- 从最小链路开始:Prompt → Model → Output Parser。
- 做 RAG 时优先把 chunk、召回和评估集补齐。
- 复杂 Agent 场景先确认是否真的需要多步规划和工具编排。
- 对每条链建立失败样例和结果回放机制。
排错清单
- 输出不稳定:先检查 Prompt 和温度,而不是先换框架。
- 回答偏题:先检查检索和上下文是否命中。
- JSON 解析失败:先检查输出格式约束是否足够强。
- 延迟高:先看链路层数、检索耗时和模型调用次数。
自定义 Tool 与多 Agent 协作
LangChain 的 Agent 机制允许 LLM 自主选择和调用工具。下面展示自定义工具的完整实现。
from langchain_core.tools import tool
from pydantic import BaseModel, Field
from typing import Optional
# 定义工具输入 Schema
class WeatherInput(BaseModel):
city: str = Field(description="城市名称")
date: Optional[str] = Field(default=None, description="日期,格式 YYYY-MM-DD")
@tool(args_schema=WeatherInput)
def get_weather(city: str, date: Optional[str] = None) -> str:
"""查询指定城市的天气信息"""
# 实际项目中调用天气 API
weather_data = {
"北京": "晴天,温度 15-25°C",
"上海": "多云,温度 18-22°C",
"深圳": "小雨,温度 22-28°C",
}
return weather_data.get(city, f"未找到 {city} 的天气数据")
@tool
def search_database(query: str, limit: int = 5) -> str:
"""在数据库中搜索业务数据"""
# 模拟数据库查询
results = [
{"id": 1, "name": "产品A", "stock": 100},
{"id": 2, "name": "产品B", "stock": 50},
]
return str(results[:limit])
# 工具注册表模式 — 便于管理大量工具
class ToolRegistry:
"""工具注册表"""
_tools: dict[str, callable] = {}
@classmethod
def register(cls, name: str):
def decorator(func):
cls._tools[name] = func
return func
return decorator
@classmethod
def get_all_tools(cls) -> list:
return list(cls._tools.values())
# 使用 create_react_agent(LangGraph 方式)
from langgraph.prebuilt import create_react_agent
# model = ChatOpenAI(model="gpt-4o-mini")
# tools = [get_weather, search_database]
# agent = create_react_agent(model, tools)
# result = agent.invoke({"messages": [("user", "北京今天天气怎么样?")]})RAG 进阶:混合检索与重排序
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
def hybrid_search_example():
"""混合检索策略:向量检索 + 关键词检索"""
# 1. 向量检索(语义相似度)
vector_store = Chroma(
embedding_function=OpenAIEmbeddings(),
persist_directory="./chroma_db"
)
vector_results = vector_store.similarity_search_with_score(
query="如何设计微服务架构",
k=10
)
# 2. 关键词检索(BM25,需要 rank_bm25 库)
# from rank_bm25 import BM25Okapi
# bm25 = BM25Okapi(tokenized_corpus)
# bm25_results = bm25.get_top_n(tokenized_query, corpus, n=10)
# 3. 融合排序(Reciprocal Rank Fusion)
def reciprocal_rank_fusion(lists_of_results, k=60):
"""RRF 融合多个检索结果"""
fused_scores = {}
for results in lists_of_results:
for rank, doc in enumerate(results):
if doc not in fused_scores:
fused_scores[doc] = 0
fused_scores[doc] += 1.0 / (rank + k)
# 按融合分数排序
return sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
# 4. 重排序(Re-ranking)— 使用 Cross-Encoder 或 LLM
# from langchain.retrievers import ContextualCompressionRetriever
# from langchain.retrievers.document_compressors import LLMChainExtractor
# compressor = LLMChainExtractor.from_llm(llm)
# compression_retriever = ContextualCompressionRetriever(
# base_compressor=compressor,
# base_retriever=vector_store.as_retriever()
# )
print("RAG 优化策略:")
print(" 1. 分块策略:按段落/语义边界切分,chunk_size=500-1000")
print(" 2. 混合检索:向量 + BM25,取长补短")
print(" 3. 重排序:用 Cross-Encoder 或 LLM 对召回结果精排")
print(" 4. 查询改写:用 LLM 将用户问题改写为更适合检索的表述")
print(" 5. 父文档检索:返回小 chunk 对应的完整文档段落")
hybrid_search_example()评估与测试
# LangChain 提供了评估框架,帮助量化 RAG 和 Chain 的效果
def evaluation_example():
"""RAG 评估指标"""
metrics = {
"Faithfulness(忠实度)": "回答是否严格基于检索到的上下文,没有编造",
"Answer Relevancy(答案相关性)": "回答是否与问题相关",
"Context Precision(上下文精确度)": "检索到的上下文中有用信息的比例",
"Context Recall(上下文召回率)": "回答所需信息是否都被检索到",
}
print("RAG 评估指标体系:")
for metric, desc in metrics.items():
print(f" {metric}: {desc}")
# 常用评估工具
tools = {
"Ragas": "专门的 RAG 评估框架,支持上述全部指标",
"LangSmith": "LangChain 官方可观测性平台,追踪每步执行",
"DeepEval": "集成到 pytest 的 LLM 评估框架",
"自定义评估集": "准备 Q&A 对,自动比较生成答案与标准答案",
}
print("\n评估工具推荐:")
for tool, desc in tools.items():
print(f" {tool}: {desc}")
evaluation_example()