Appearance
复合链
复合链是将多个基础链组合在一起的链类型,它允许开发者构建更复杂的工作流。在 LangChain 1.2 中,使用 LCEL(LangChain Expression Language)语法可以更灵活地组合链。
安装依赖
在开始使用复合链之前,需要安装以下依赖:
bash
pip install langchain langchain-openai python-dotenv什么是复合链?
复合链是由多个链组成的链,这些链按顺序或并行执行。在 LangChain 1.2 中,使用 LCEL 语法可以轻松地组合多个链,形成更复杂的工作流。
顺序链
使用 LCEL 语法将多个链按顺序组合在一起。
初始化顺序链
python
from langchain_core.prompts import PromptTemplate
from langchain_openai import OpenAI
from langchain_core.output_parsers import StrOutputParser
# 创建第一个提示词模板
prompt1 = PromptTemplate(
template="What is a good name for a {product} company that makes {description}?",
input_variables=["product", "description"]
)
# 创建第二个提示词模板
prompt2 = PromptTemplate(
template="Write a tagline for a company named {company_name} that makes {description}.",
input_variables=["company_name", "description"]
)
# 初始化语言模型
llm = OpenAI(api_key="your-api-key")
# 创建输出解析器
output_parser = StrOutputParser()
# 创建第一个链
chain1 = prompt1 | llm | output_parser
# 创建第二个链,使用第一个链的输出
chain2 = (
{"company_name": chain1, "description": lambda x: x["description"]}
| prompt2
| llm
| output_parser
)使用顺序链
python
# 运行复合链
response = chain2.invoke({"product": "tech", "description": "smartphones"})
print("Response:", response)使用 RunnableSequence
RunnableSequence 是 LCEL 中用于顺序执行多个 Runnable 的类。
初始化 RunnableSequence
python
from langchain_core.runnables import RunnableSequence
# 创建两个链
chain1 = prompt1 | llm | output_parser
chain2 = prompt2 | llm | output_parser
# 创建 RunnableSequence
# 注意:需要确保 chain1 的输出包含 chain2 所需的输入
sequential_chain = RunnableSequence(
first=chain1,
middle=[],
last=chain2
)使用 RunnableSequence
python
# 运行顺序链
response = sequential_chain.invoke({"product": "tech", "description": "smartphones"})
print(response)路由链
在 LangChain 1.2 中,可以使用自定义逻辑实现路由功能。
初始化路由链
python
from langchain_core.runnables import RunnableBranch, RunnableLambda
from langchain_core.prompts import PromptTemplate
from langchain_openai import OpenAI
from langchain_core.output_parsers import StrOutputParser
# 初始化语言模型
llm = OpenAI(api_key="your-api-key")
output_parser = StrOutputParser()
# 定义不同的提示词模板
physics_prompt = PromptTemplate(
template="""You are a helpful assistant who knows a lot about physics.
{input}
""",
input_variables=["input"]
)
math_prompt = PromptTemplate(
template="""You are a helpful assistant who knows a lot about math.
{input}
""",
input_variables=["input"]
)
default_prompt = PromptTemplate(
template="""You are a helpful assistant.
{input}
""",
input_variables=["input"]
)
# 创建各个链
physics_chain = physics_prompt | llm | output_parser
math_chain = math_prompt | llm | output_parser
default_chain = default_prompt | llm | output_parser
# 创建路由函数
def route_input(input_data: dict) -> str:
input_text = input_data["input"].lower()
if "physics" in input_text or "relativity" in input_text or "quantum" in input_text:
return "physics"
elif "math" in input_text or "derivative" in input_text or "equation" in input_text:
return "math"
else:
return "default"
# 创建路由链
branch = RunnableBranch(
(lambda x: route_input(x) == "physics", physics_chain),
(lambda x: route_input(x) == "math", math_chain),
default_chain
)使用路由链
python
# 运行路由链
response1 = branch.invoke({"input": "What is the theory of relativity?"})
print("Physics response:", response1)
response2 = branch.invoke({"input": "What is the derivative of x^2?"})
print("Math response:", response2)
response3 = branch.invoke({"input": "What is LangChain?"})
print("Default response:", response3)并行链
使用 RunnableParallel 同时执行多个链。
初始化并行链
python
from langchain_core.runnables import RunnableParallel
# 创建多个并行的链
name_prompt = PromptTemplate(
template="What is a good name for a {product} company?",
input_variables=["product"]
)
tagline_prompt = PromptTemplate(
template="Write a tagline for a {product} company.",
input_variables=["product"]
)
description_prompt = PromptTemplate(
template="Write a short description for a {product} company.",
input_variables=["product"]
)
# 创建各个链
name_chain = name_prompt | llm | output_parser
tagline_chain = tagline_prompt | llm | output_parser
description_chain = description_prompt | llm | output_parser
# 创建并行链
parallel_chain = RunnableParallel(
company_name=name_chain,
tagline=tagline_chain,
description=description_chain
)使用并行链
python
# 运行并行链
response = parallel_chain.invoke({"product": "tech"})
print("Company name:", response["company_name"])
print("Tagline:", response["tagline"])
print("Description:", response["description"])组合顺序链和并行链
可以将顺序链和并行链组合在一起,构建更复杂的工作流。
初始化组合链
python
from langchain_core.runnables import RunnablePassthrough
# 第一步:生成公司名称
name_prompt = PromptTemplate(
template="What is a good name for a {product} company that makes {description}?",
input_variables=["product", "description"]
)
name_chain = name_prompt | llm | output_parser
# 第二步:并行生成标语和描述
tagline_prompt = PromptTemplate(
template="Write a tagline for a company named {company_name}.",
input_variables=["company_name"]
)
description_prompt = PromptTemplate(
template="Write a short description for a company named {company_name} that makes {description}.",
input_variables=["company_name", "description"]
)
tagline_chain = tagline_prompt | llm | output_parser
description_chain = description_prompt | llm | output_parser
# 创建组合链
def get_company_name(input_data):
return {"company_name": name_chain.invoke(input_data), "description": input_data["description"]}
parallel_chain = RunnableParallel(
tagline=lambda x: tagline_chain.invoke({"company_name": x["company_name"]}),
description=lambda x: description_chain.invoke({"company_name": x["company_name"], "description": x["description"]})
)
# 组合链
combined_chain = (
RunnableLambda(get_company_name)
| parallel_chain
)使用组合链
python
# 运行组合链
response = combined_chain.invoke({"product": "tech", "description": "smartphones"})
print("Tagline:", response["tagline"])
print("Description:", response["description"])自定义复合链
使用 LCEL 语法创建自定义复合链。
创建自定义复合链
python
from langchain_core.runnables import RunnableLambda
from typing import Dict, Any
# 创建自定义处理函数
def process_company_info(input_data: Dict[str, Any]) -> Dict[str, Any]:
"""处理公司信息,添加额外的元数据"""
company_name = input_data.get("company_name", "")
tagline = input_data.get("tagline", "")
# 添加额外的处理逻辑
return {
"company_name": company_name.strip(),
"tagline": tagline.strip(),
"full_info": f"{company_name} - {tagline}"
}
# 创建自定义 Runnable
custom_processor = RunnableLambda(process_company_info)
# 创建基础链
name_prompt = PromptTemplate(
template="What is a good name for a {product} company?",
input_variables=["product"]
)
tagline_prompt = PromptTemplate(
template="Write a tagline for a company named {company_name}.",
input_variables=["company_name"]
)
name_chain = name_prompt | llm | output_parser
# 创建组合链
def create_company_data(input_data):
company_name = name_chain.invoke(input_data)
tagline = tagline_chain.invoke({"company_name": company_name})
return {"company_name": company_name, "tagline": tagline}
custom_chain = (
RunnableLambda(create_company_data)
| custom_processor
)
# 使用自定义链
response = custom_chain.invoke({"product": "tech"})
print("Company name:", response["company_name"])
print("Tagline:", response["tagline"])
print("Full info:", response["full_info"])错误处理和重试
在复合链中添加错误处理和重试机制。
添加错误处理
python
from langchain_core.runnables import RunnableConfig
# 创建带有错误处理的链
def safe_invoke(chain, input_data, max_retries=3):
for attempt in range(max_retries):
try:
return chain.invoke(input_data)
except Exception as e:
if attempt == max_retries - 1:
return f"Error after {max_retries} attempts: {str(e)}"
print(f"Attempt {attempt + 1} failed, retrying...")
# 使用错误处理
response = safe_invoke(parallel_chain, {"product": "tech"})
print(response)总结
在 LangChain 1.2 中,使用 LCEL 语法构建复合链更加灵活和强大。主要特点包括:
- 管道操作符
|:使用管道操作符组合组件 - RunnableParallel:轻松创建并行链
- RunnableBranch:实现路由功能
- RunnableLambda:创建自定义处理逻辑
- RunnableSequence:顺序执行多个 Runnable
在实际应用中,您可以根据具体需求选择合适的复合链类型,构建功能强大的 LLM 应用。