现在位置: 首页 > LangChain 教程 > 正文

LangChain 模型调用拦截 -- @wrap_model_call

@wrap_model_call 是中间件(Middleware)中最强大的钩子。

@wrap_model_call 不像 before/after 那样只是观察,而是可以完全控制模型的执行过程——重试、降级、缓存、甚至跳过模型直接用预设回复。


理解 handler 回调

@wrap_model_call 的核心是一个 handler 回调函数。调用 handler(request) 才会真正执行模型;不调用则跳过模型。

实例

# wrap_model_call 的基本结构
# request: 包含模型、消息、工具等全部信息
# handler: 一个可调用对象,执行它才会真正调用模型

@wrap_model_call
def my_middleware(request, handler):
    # 在模型调用前可以做任何事
    print("模型即将被调用...")

    # 调用 handler(request) 才真正执行模型
    response = handler(request)

    # 在模型调用后可以做任何事
    print("模型调用完成")

    return response

场景 1:重试机制

这是最常见的场景——模型调用可能因网络问题失败,自动重试可以提高可靠性:

实例

from dotenv import load_dotenv
load_dotenv()

from langchain.agents import create_agent
from langchain.agents.middleware import wrap_model_call
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage


@wrap_model_call
def retry_on_error(request, handler):
    """模型调用失败时自动重试,最多 3 次"""
    max_retries = 3
    last_error = None

    for attempt in range(max_retries):
        try:
            result = handler(request)
            if attempt > 0:
                print(f"  [重试成功] 第 {attempt + 1} 次尝试")
            return result
        except Exception as e:
            last_error = e
            if attempt < max_retries - 1:
                wait_time = (attempt + 1) * 2  # 递增等待:2s, 4s
                print(f"  [重试] 第 {attempt + 1} 次失败,{wait_time}秒后重试...")
                import time
                time.sleep(wait_time)

    # 所有重试都失败了
    raise last_error


model = init_chat_model("deepseek:deepseek-v4-flash", temperature=0)
agent = create_agent(
    model=model,
    middleware=[retry_on_error],
    system_prompt="你是菜鸟教程 RUNOOB 的助手。",
)

result = agent.invoke({
    "messages": [HumanMessage(content="介绍一下菜鸟教程")]
})
print(f"\n回复: {result['messages'][-1].content[:100]}...")

场景 2:模型降级/故障转移

当主模型不可用时,自动切换到备用模型:

实例

from langchain.agents.middleware import wrap_model_call
from langchain.chat_models import init_chat_model
from langchain.messages import AIMessage


# 主模型和备用模型
primary_model = init_chat_model("deepseek:deepseek-v4-flash", temperature=0)
fallback_model = init_chat_model("deepseek:deepseek-v4-flash", temperature=0,
                                  max_tokens=100)  # 降级模型限制 Token


@wrap_model_call
def fallback_on_error(request, handler):
    """主模型失败时自动切换到备用模型"""
    try:
        # 尝试用主模型
        return handler(request)
    except Exception as e:
        print(f"[降级] 主模型失败: {e},切换到备用模型...")

        # 覆盖 request 中的 model,切换到备用模型
        request = request.override(model=fallback_model)
        try:
            return handler(request)
        except Exception as e2:
            print(f"[降级] 备用模型也失败了: {e2}")
            # 两个模型都失败,返回友好提示
            return AIMessage(
                content="抱歉,服务暂时不可用,请稍后重试。"
            )

request.override() 是一个不可变方法——它返回一个新的 request 副本,不会修改原始对象。这确保了每次调用都是独立和安全的。


场景 3:缓存模型响应

对于重复的查询,可以缓存模型响应以减少 API 调用成本:

实例

from langchain.agents.middleware import wrap_model_call
from langchain.messages import AIMessage


# 简单的内存缓存
cache = {}


@wrap_model_call
def cache_responses(request, handler):
    """缓存模型响应,相同问题不重复调用"""
    # 使用最后一条用户消息的内容作为缓存键
    messages = request.messages
    if not messages:
        return handler(request)

    # 生成缓存键
    last_content = str(messages[-1].content) if hasattr(messages[-1], 'content') else ""
    cache_key = last_content[:200]  # 截断超长内容

    # 检查缓存
    if cache_key in cache:
        print(f"[缓存命中] 直接返回缓存结果")
        cached = cache[cache_key]
        return AIMessage(content=f"{cached}\n\n*(来自缓存)*")

    # 缓存未命中,调用模型
    result = handler(request)
    # AIMessage 的 content 可能混在列表中,直接取第一个
    if hasattr(result, 'content'):
        cache[cache_key] = result.content
        print(f"[缓存未命中] 已存入缓存,当前 {len(cache)} 条")
    elif hasattr(result, 'model_response'):
        # ExtendedModelResponse 的情况
        pass

    return result


model = init_chat_model("deepseek:deepseek-v4-flash", temperature=0)
agent = create_agent(
    model=model,
    middleware=[cache_responses],
    system_prompt="你是菜鸟教程 RUNOOB 的助手,回答要简洁。",
)

# 第一次查询(缓存未命中)
result = agent.invoke({
    "messages": [{"role": "user", "content": "Python 是什么?"}]
})
print(f"第一次: {result['messages'][-1].content[:80]}...\n")

# 第二次查询相同问题(缓存命中)
result = agent.invoke({
    "messages": [{"role": "user", "content": "Python 是什么?"}]
})
print(f"第二次: {result['messages'][-1].content[:80]}...")

运行结果:

[缓存未命中] 已存入缓存,当前 1 条
第一次: Python 是一种高级编程语言,以简洁易读的语法著称...

[缓存命中] 直接返回缓存结果
第二次: Python 是一种高级编程语言,以简洁易读的语法著称...
*(来自缓存)*

场景 4:修改 request——动态注入系统消息

实例

from datetime import datetime
from langchain.agents.middleware import wrap_model_call
from langchain.messages import SystemMessage


@wrap_model_call
def inject_time_context(request, handler):
    """在每次模型调用前注入当前时间信息"""
    now = datetime.now()
    time_context = (
        f"当前时间:{now.strftime('%Y年%m月%d日 %H:%M')}。"
        f"今天是{['周一','周二','周三','周四','周五','周六','周日'][now.weekday()]}。"
    )

    # 在原有 system_message 基础上追加时间信息
    if request.system_message:
        new_content = f"{request.system_message.content}\n\n{time_context}"
    else:
        new_content = time_context

    # 用 override 创建新的 request
    new_request = request.override(
        system_message=SystemMessage(content=new_content)
    )

    return handler(new_request)

场景 5:多个 wrap_model_call 的组合

多个 wrap_model_call 中间件会自动按顺序组合——第一个在最外层:

实例

from langchain.agents.middleware import wrap_model_call


@wrap_model_call
def outer_middleware(request, handler):
    """最外层中间件"""
    print("[外层] 开始")
    result = handler(request)       # 这里会进入 inner_middleware
    print("[外层] 结束")
    return result


@wrap_model_call
def inner_middleware(request, handler):
    """内层中间件"""
    print("  [内层] 开始")
    result = handler(request)       # 这里才真正调用模型
    print("  [内层] 结束")
    return result


# 执行顺序:
# [外层] 开始
#   [内层] 开始
#     → 真正调用模型
#   [内层] 结束
# [外层] 结束

多个 wrap_model_call 就像洋葱一样层层包裹。最外层最先执行、最后返回。这让你可以组合多个独立的功能——比如外层做缓存检查,内层做重试,互不干扰。