Skip to content

复合链

复合链是将多个基础链组合在一起的链类型,它允许开发者构建更复杂的工作流。在 LangChain 1.2 中,使用 LCEL(LangChain Expression Language)语法可以更灵活地组合链。

安装依赖

在开始使用复合链之前,需要安装以下依赖:

bash
pip install langchain langchain-openai python-dotenv

什么是复合链?

复合链是由多个链组成的链,这些链按顺序或并行执行。在 LangChain 1.2 中,使用 LCEL 语法可以轻松地组合多个链,形成更复杂的工作流。

顺序链

使用 LCEL 语法将多个链按顺序组合在一起。

初始化顺序链

python
from langchain_core.prompts import PromptTemplate
from langchain_openai import OpenAI
from langchain_core.output_parsers import StrOutputParser

# 创建第一个提示词模板
prompt1 = PromptTemplate(
    template="What is a good name for a {product} company that makes {description}?",
    input_variables=["product", "description"]
)

# 创建第二个提示词模板
prompt2 = PromptTemplate(
    template="Write a tagline for a company named {company_name} that makes {description}.",
    input_variables=["company_name", "description"]
)

# 初始化语言模型
llm = OpenAI(api_key="your-api-key")

# 创建输出解析器
output_parser = StrOutputParser()

# 创建第一个链
chain1 = prompt1 | llm | output_parser

# 创建第二个链,使用第一个链的输出
chain2 = (
    {"company_name": chain1, "description": lambda x: x["description"]}
    | prompt2
    | llm
    | output_parser
)

使用顺序链

python
# 运行复合链
response = chain2.invoke({"product": "tech", "description": "smartphones"})
print("Response:", response)

使用 RunnableSequence

RunnableSequence 是 LCEL 中用于顺序执行多个 Runnable 的类。

初始化 RunnableSequence

python
from langchain_core.runnables import RunnableSequence

# 创建两个链
chain1 = prompt1 | llm | output_parser
chain2 = prompt2 | llm | output_parser

# 创建 RunnableSequence
# 注意:需要确保 chain1 的输出包含 chain2 所需的输入
sequential_chain = RunnableSequence(
    first=chain1,
    middle=[],
    last=chain2
)

使用 RunnableSequence

python
# 运行顺序链
response = sequential_chain.invoke({"product": "tech", "description": "smartphones"})
print(response)

路由链

在 LangChain 1.2 中,可以使用自定义逻辑实现路由功能。

初始化路由链

python
from langchain_core.runnables import RunnableBranch, RunnableLambda
from langchain_core.prompts import PromptTemplate
from langchain_openai import OpenAI
from langchain_core.output_parsers import StrOutputParser

# 初始化语言模型
llm = OpenAI(api_key="your-api-key")
output_parser = StrOutputParser()

# 定义不同的提示词模板
physics_prompt = PromptTemplate(
    template="""You are a helpful assistant who knows a lot about physics.

{input}
""",
    input_variables=["input"]
)

math_prompt = PromptTemplate(
    template="""You are a helpful assistant who knows a lot about math.

{input}
""",
    input_variables=["input"]
)

default_prompt = PromptTemplate(
    template="""You are a helpful assistant.

{input}
""",
    input_variables=["input"]
)

# 创建各个链
physics_chain = physics_prompt | llm | output_parser
math_chain = math_prompt | llm | output_parser
default_chain = default_prompt | llm | output_parser

# 创建路由函数
def route_input(input_data: dict) -> str:
    input_text = input_data["input"].lower()
    if "physics" in input_text or "relativity" in input_text or "quantum" in input_text:
        return "physics"
    elif "math" in input_text or "derivative" in input_text or "equation" in input_text:
        return "math"
    else:
        return "default"

# 创建路由链
branch = RunnableBranch(
    (lambda x: route_input(x) == "physics", physics_chain),
    (lambda x: route_input(x) == "math", math_chain),
    default_chain
)

使用路由链

python
# 运行路由链
response1 = branch.invoke({"input": "What is the theory of relativity?"})
print("Physics response:", response1)

response2 = branch.invoke({"input": "What is the derivative of x^2?"})
print("Math response:", response2)

response3 = branch.invoke({"input": "What is LangChain?"})
print("Default response:", response3)

并行链

使用 RunnableParallel 同时执行多个链。

初始化并行链

python
from langchain_core.runnables import RunnableParallel

# 创建多个并行的链
name_prompt = PromptTemplate(
    template="What is a good name for a {product} company?",
    input_variables=["product"]
)

tagline_prompt = PromptTemplate(
    template="Write a tagline for a {product} company.",
    input_variables=["product"]
)

description_prompt = PromptTemplate(
    template="Write a short description for a {product} company.",
    input_variables=["product"]
)

# 创建各个链
name_chain = name_prompt | llm | output_parser
tagline_chain = tagline_prompt | llm | output_parser
description_chain = description_prompt | llm | output_parser

# 创建并行链
parallel_chain = RunnableParallel(
    company_name=name_chain,
    tagline=tagline_chain,
    description=description_chain
)

使用并行链

python
# 运行并行链
response = parallel_chain.invoke({"product": "tech"})
print("Company name:", response["company_name"])
print("Tagline:", response["tagline"])
print("Description:", response["description"])

组合顺序链和并行链

可以将顺序链和并行链组合在一起,构建更复杂的工作流。

初始化组合链

python
from langchain_core.runnables import RunnablePassthrough

# 第一步:生成公司名称
name_prompt = PromptTemplate(
    template="What is a good name for a {product} company that makes {description}?",
    input_variables=["product", "description"]
)
name_chain = name_prompt | llm | output_parser

# 第二步:并行生成标语和描述
tagline_prompt = PromptTemplate(
    template="Write a tagline for a company named {company_name}.",
    input_variables=["company_name"]
)
description_prompt = PromptTemplate(
    template="Write a short description for a company named {company_name} that makes {description}.",
    input_variables=["company_name", "description"]
)

tagline_chain = tagline_prompt | llm | output_parser
description_chain = description_prompt | llm | output_parser

# 创建组合链
def get_company_name(input_data):
    return {"company_name": name_chain.invoke(input_data), "description": input_data["description"]}

parallel_chain = RunnableParallel(
    tagline=lambda x: tagline_chain.invoke({"company_name": x["company_name"]}),
    description=lambda x: description_chain.invoke({"company_name": x["company_name"], "description": x["description"]})
)

# 组合链
combined_chain = (
    RunnableLambda(get_company_name)
    | parallel_chain
)

使用组合链

python
# 运行组合链
response = combined_chain.invoke({"product": "tech", "description": "smartphones"})
print("Tagline:", response["tagline"])
print("Description:", response["description"])

自定义复合链

使用 LCEL 语法创建自定义复合链。

创建自定义复合链

python
from langchain_core.runnables import RunnableLambda
from typing import Dict, Any

# 创建自定义处理函数
def process_company_info(input_data: Dict[str, Any]) -> Dict[str, Any]:
    """处理公司信息,添加额外的元数据"""
    company_name = input_data.get("company_name", "")
    tagline = input_data.get("tagline", "")

    # 添加额外的处理逻辑
    return {
        "company_name": company_name.strip(),
        "tagline": tagline.strip(),
        "full_info": f"{company_name} - {tagline}"
    }

# 创建自定义 Runnable
custom_processor = RunnableLambda(process_company_info)

# 创建基础链
name_prompt = PromptTemplate(
    template="What is a good name for a {product} company?",
    input_variables=["product"]
)
tagline_prompt = PromptTemplate(
    template="Write a tagline for a company named {company_name}.",
    input_variables=["company_name"]
)

name_chain = name_prompt | llm | output_parser

# 创建组合链
def create_company_data(input_data):
    company_name = name_chain.invoke(input_data)
    tagline = tagline_chain.invoke({"company_name": company_name})
    return {"company_name": company_name, "tagline": tagline}

custom_chain = (
    RunnableLambda(create_company_data)
    | custom_processor
)

# 使用自定义链
response = custom_chain.invoke({"product": "tech"})
print("Company name:", response["company_name"])
print("Tagline:", response["tagline"])
print("Full info:", response["full_info"])

错误处理和重试

在复合链中添加错误处理和重试机制。

添加错误处理

python
from langchain_core.runnables import RunnableConfig

# 创建带有错误处理的链
def safe_invoke(chain, input_data, max_retries=3):
    for attempt in range(max_retries):
        try:
            return chain.invoke(input_data)
        except Exception as e:
            if attempt == max_retries - 1:
                return f"Error after {max_retries} attempts: {str(e)}"
            print(f"Attempt {attempt + 1} failed, retrying...")

# 使用错误处理
response = safe_invoke(parallel_chain, {"product": "tech"})
print(response)

总结

在 LangChain 1.2 中,使用 LCEL 语法构建复合链更加灵活和强大。主要特点包括:

  1. 管道操作符 |:使用管道操作符组合组件
  2. RunnableParallel:轻松创建并行链
  3. RunnableBranch:实现路由功能
  4. RunnableLambda:创建自定义处理逻辑
  5. RunnableSequence:顺序执行多个 Runnable

在实际应用中,您可以根据具体需求选择合适的复合链类型,构建功能强大的 LLM 应用。