跳转至

LangChain 流式传输

概述

LangChain 的流式传输系统可以实时展示更新,这对于构建响应迅速的 LLM 应用至关重要。通过逐步显示输出(即使在完整响应准备好之前),流式传输显著改善了用户体验,特别是在处理 LLM 的延迟时。

流式传输的优势

  • 实时反馈:用户可以看到处理进度
  • 降低感知延迟:即使总时间相同,用户体验更好
  • 调试友好:可以观察每个步骤的执行情况
  • 灵活控制:支持多种流式传输模式

基础设置

创建基础 Agent

from langchain.agents import create_agent

# 创建一个简单的工具函数
def get_weather(city: str) -> str:
    """获取指定城市的天气"""
    return f"{city}的天气是晴朗的,25°C"

# 创建 Agent
agent = create_agent(
    model="openai:gpt-4o",
    tools=[get_weather],
)

流式传输模式

1. 代理进度流 (Agent Progress)

使用 stream_mode="updates" 来流式传输代理的每个步骤进度。

def stream_agent_progress():
    """流式传输代理执行进度"""
    print("=== 代理进度流式传输 ===")

    for chunk in agent.stream(
        {"messages": [{"role": "user", "content": "北京和上海的天气怎么样?"}]},
        stream_mode="updates",  # 关键参数
    ):
        for step, data in chunk.items():
            print(f"步骤: {step}")
            if 'messages' in data and data['messages']:
                last_message = data['messages'][-1]
                print(f"内容: {last_message.content}")
                if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
                    print(f"工具调用: {last_message.tool_calls}")
            print("-" * 50)

# 调用示例
stream_agent_progress()

输出示例:

步骤: model
内容: 
工具调用: [{'name': 'get_weather', 'args': {'city': '北京'}, 'id': 'call_123'}]
--------------------------------------------------
步骤: tools
内容: 北京的天气是晴朗的,25°C
--------------------------------------------------
步骤: model
内容: 北京天气晴朗,25°C。接下来查询上海天气...
工具调用: [{'name': 'get_weather', 'args': {'city': '上海'}, 'id': 'call_456'}]
--------------------------------------------------
步骤: tools
内容: 上海的天气是晴朗的,25°C
--------------------------------------------------
步骤: model
内容: 北京和上海都是晴朗天气,25°C。
--------------------------------------------------

2. LLM Token 流 (LLM Tokens)

使用 stream_mode="messages" 来流式传输 LLM 生成的每个 token。

def stream_llm_tokens():
    """流式传输 LLM 生成的 tokens"""
    print("=== LLM Token 流式传输 ===")

    for token, metadata in agent.stream(
        {"messages": [{"role": "user", "content": "上海的天气如何?"}]},
        stream_mode="messages",  # 关键参数
    ):
        node_name = metadata.get('langgraph_node', 'unknown')

        if hasattr(token, 'content_blocks') and token.content_blocks:
            for block in token.content_blocks:
                if block.get('type') == 'text' and block.get('text'):
                    print(f"[{node_name}] {block['text']}", end='', flush=True)
                elif block.get('type') == 'tool_call_chunk':
                    print(f"\n[工具调用] {block.get('name', '')} {block.get('args', '')}")

    print()  # 最终换行

# 调用示例
stream_llm_tokens()

输出示例:

[model] 让我
[model] 来查询
[model] 一下
[model] 上海
[model] 的天气
[model] ...
[工具调用] get_weather {"city":"上海"}
[tools] 上海的天气是晴朗的,25°C
[model] 上海
[model] 的天气
[model] 是晴朗的
[model] ,25°C
[model] 。

3. 自定义更新流 (Custom Updates)

在工具中使用 get_stream_writer() 来发送自定义的流式更新。

from langgraph.config import get_stream_writer

def create_custom_streaming_tool():
    """创建支持自定义流式传输的工具"""

    def search_products(query: str, max_results: int = 5) -> str:
        """搜索产品信息"""
        writer = get_stream_writer()

        # 发送自定义进度更新
        writer(f"🔍 开始搜索: {query}")
        writer(f"📊 最大结果数: {max_results}")

        # 模拟搜索过程
        writer("⏳ 连接数据库...")
        # 模拟数据库查询
        writer("✅ 数据库连接成功")

        writer("🔎 执行搜索查询...")
        # 模拟搜索逻辑
        import time
        time.sleep(0.5)

        writer(f"📦 找到 3 个相关产品")

        # 返回最终结果
        return f"搜索 '{query}' 找到 3 个产品: 产品A, 产品B, 产品C"

    return search_products

def stream_custom_updates():
    """流式传输自定义更新"""
    print("=== 自定义更新流式传输 ===")

    search_tool = create_custom_streaming_tool()
    custom_agent = create_agent(
        model="openai:gpt-4o",
        tools=[search_tool],
    )

    for chunk in custom_agent.stream(
        {"messages": [{"role": "user", "content": "搜索笔记本电脑"}]},
        stream_mode="custom"  # 关键参数
    ):
        print(f"自定义更新: {chunk}")

# 调用示例
stream_custom_updates()

输出示例:

自定义更新: 🔍 开始搜索: 笔记本电脑
自定义更新: 📊 最大结果数: 5
自定义更新: ⏳ 连接数据库...
自定义更新: ✅ 数据库连接成功
自定义更新: 🔎 执行搜索查询...
自定义更新: 📦 找到 3 个相关产品

4. 多模式流式传输

可以同时使用多种流式传输模式。

def stream_multiple_modes():
    """同时使用多种流式传输模式"""
    print("=== 多模式流式传输 ===")

    # 创建支持自定义流的工具
    def advanced_weather_tool(city: str) -> str:
        """高级天气查询工具"""
        writer = get_stream_writer()
        writer(f"🌤️  开始查询 {city} 的天气")
        writer("📡 连接气象API...")
        writer("🔍 获取实时数据...")
        return f"{city}的天气:晴朗,25°C,湿度60%"

    multi_agent = create_agent(
        model="openai:gpt-4o",
        tools=[advanced_weather_tool],
    )

    for stream_mode, chunk in multi_agent.stream(
        {"messages": [{"role": "user", "content": "查询杭州的天气"}]},
        stream_mode=["updates", "custom", "messages"]  # 多种模式
    ):
        print(f"模式: {stream_mode}")
        print(f"内容: {chunk}")
        print("-" * 30)

# 调用示例
stream_multiple_modes()

实际应用场景

场景1:实时聊天应用

import asyncio
from langchain.agents import create_agent

class StreamingChatApp:
    """支持流式传输的聊天应用"""

    def __init__(self):
        self.agent = create_agent(
            model="openai:gpt-4o",
            tools=[self.get_weather, self.search_web],
        )

    def get_weather(self, city: str) -> str:
        """获取天气信息"""
        writer = get_stream_writer()
        writer(f"查询{city}的天气...")
        # 模拟API调用
        return f"{city}: 25°C, 晴朗"

    def search_web(self, query: str) -> str:
        """网页搜索"""
        writer = get_stream_writer()
        writer(f"搜索: {query}")
        writer("正在获取最新信息...")
        return f"关于'{query}'的搜索结果..."

    async def chat_stream(self, message: str):
        """流式聊天"""
        print(f"用户: {message}")
        print("助手: ", end="", flush=True)

        full_response = ""
        for token, metadata in self.agent.stream(
            {"messages": [{"role": "user", "content": message}]},
            stream_mode="messages",
        ):
            if hasattr(token, 'content_blocks'):
                for block in token.content_blocks:
                    if block.get('type') == 'text' and block.get('text'):
                        text = block['text']
                        print(text, end='', flush=True)
                        full_response += text

        print()  # 换行
        return full_response

# 使用示例
async def demo_chat():
    app = StreamingChatApp()
    await app.chat_stream("今天杭州天气怎么样?然后搜索AI最新发展")

场景2:进度监控仪表板

from typing import Dict, Any
import json

class ProgressMonitor:
    """进度监控器"""

    def __init__(self):
        self.progress_data = {
            'total_steps': 0,
            'completed_steps': 0,
            'current_step': '',
            'details': []
        }

    def update_progress(self, step: str, details: str = ""):
        """更新进度"""
        self.progress_data['current_step'] = step
        self.progress_data['details'].append({
            'step': step,
            'details': details,
            'timestamp': str(datetime.now())
        })
        self.progress_data['completed_steps'] += 1

        # 发送到前端(模拟)
        print(f"进度更新: {json.dumps(self.progress_data, ensure_ascii=False)}")

def create_monitored_tools(monitor: ProgressMonitor):
    """创建被监控的工具"""

    def research_topic(topic: str) -> str:
        """研究主题"""
        writer = get_stream_writer()

        monitor.update_progress('research', f"开始研究: {topic}")
        writer(f"🔬 研究主题: {topic}")

        # 模拟研究步骤
        steps = [
            "收集相关资料",
            "分析关键信息", 
            "整理研究结果",
            "生成总结报告"
        ]

        for step in steps:
            monitor.update_progress('research', step)
            writer(f"✅ {step}")
            import time
            time.sleep(0.3)

        return f"关于{topic}的研究完成"

    return research_topic

def monitored_agent_demo():
    """被监控的Agent演示"""
    monitor = ProgressMonitor()
    research_tool = create_monitored_tools(monitor)

    agent = create_agent(
        model="openai:gpt-4o",
        tools=[research_tool],
    )

    print("开始监控Agent执行...")
    for stream_mode, chunk in agent.stream(
        {"messages": [{"role": "user", "content": "研究人工智能在医疗领域的应用"}]},
        stream_mode=["updates", "custom"]
    ):
        if stream_mode == "custom":
            print(f"自定义事件: {chunk}")

# 调用示例
monitored_agent_demo()

场景3:实时数据流处理

import time
from datetime import datetime

class RealTimeDataProcessor:
    """实时数据处理器"""

    def __init__(self):
        self.agent = create_agent(
            model="openai:gpt-4o",
            tools=[self.process_data_stream],
        )
        self.data_buffer = []

    def process_data_stream(self, data_type: str, count: int = 10) -> str:
        """处理数据流"""
        writer = get_stream_writer()

        writer(f"开始处理 {data_type} 数据流...")
        writer(f"预计处理 {count} 条数据")

        # 模拟数据流处理
        for i in range(count):
            # 模拟数据处理
            processed_item = f"{data_type}_item_{i+1}"
            self.data_buffer.append(processed_item)

            # 发送进度更新
            progress = (i + 1) / count * 100
            writer(f"📊 进度: {progress:.1f}% - 已处理: {processed_item}")

            # 模拟处理时间
            time.sleep(0.1)

        writer("✅ 数据流处理完成")
        return f"成功处理 {count} 条{data_type}数据"

    def start_processing(self, data_type: str):
        """开始处理"""
        print(f"开始实时处理 {data_type} 数据...")

        for stream_mode, chunk in self.agent.stream(
            {"messages": [{"role": "user", "content": f"处理{data_type}数据流"}]},
            stream_mode=["custom", "updates"]
        ):
            if stream_mode == "custom":
                print(f"{datetime.now().strftime('%H:%M:%S')} - {chunk}")

# 使用示例
processor = RealTimeDataProcessor()
processor.start_processing("传感器")

高级功能

1. 错误处理和重试

def create_robust_streaming_tool():
    """创建健壮的流式传输工具"""

    def robust_operation(operation: str) -> str:
        """健壮的操作"""
        writer = get_stream_writer()

        try:
            writer(f"🟡 开始执行: {operation}")

            # 模拟可能失败的操作
            if "fail" in operation:
                raise Exception("模拟操作失败")

            writer("🟢 操作执行中...")
            time.sleep(1)
            writer("✅ 操作完成")

            return f"操作 '{operation}' 成功完成"

        except Exception as e:
            writer(f"🔴 操作失败: {str(e)}")
            writer("🔄 尝试重试...")
            # 这里可以添加重试逻辑
            return f"操作 '{operation}' 失败: {str(e)}"

    return robust_operation

def error_handling_demo():
    """错误处理演示"""
    robust_tool = create_robust_streaming_tool()
    agent = create_agent(
        model="openai:gpt-4o",
        tools=[robust_tool],
    )

    print("测试正常操作:")
    for chunk in agent.stream(
        {"messages": [{"role": "user", "content": "执行正常操作"}]},
        stream_mode="custom"
    ):
        print(chunk)

    print("\n测试失败操作:")
    for chunk in agent.stream(
        {"messages": [{"role": "user", "content": "执行失败操作"}]},
        stream_mode="custom"
    ):
        print(chunk)

2. 性能优化

class OptimizedStreaming:
    """优化流式传输性能"""

    def __init__(self):
        self.batch_size = 5
        self.message_buffer = []

    def batch_process_tool(self, items: list) -> str:
        """批量处理工具"""
        writer = get_stream_writer()

        writer(f"🔄 开始批量处理 {len(items)} 个项目")

        for i, item in enumerate(items):
            # 处理每个项目
            writer(f"处理项目 {i+1}/{len(items)}: {item}")

            # 模拟处理
            time.sleep(0.1)

            # 每处理完一批发送更新
            if (i + 1) % self.batch_size == 0:
                writer(f"📦 已完成 {i+1} 个项目")

        writer("✅ 批量处理完成")
        return f"成功处理 {len(items)} 个项目"

    def optimized_stream_demo(self):
        """优化流式传输演示"""
        agent = create_agent(
            model="openai:gpt-4o",
            tools=[self.batch_process_tool],
        )

        items = [f"item_{i}" for i in range(1, 16)]

        for chunk in agent.stream(
            {"messages": [{"role": "user", "content": f"批量处理这些项目: {items}"}]},
            stream_mode="custom"
        ):
            print(chunk)

最佳实践

1. 选择合适的流式传输模式

def choose_stream_mode(use_case: str):
    """根据使用场景选择合适的流式传输模式"""
    mode_recommendations = {
        "chat_application": "messages",  # 聊天应用:需要实时显示文字
        "progress_tracking": ["updates", "custom"],  # 进度跟踪:需要步骤和自定义更新
        "debugging": "updates",  # 调试:需要看到每个步骤
        "data_processing": ["custom", "messages"],  # 数据处理:需要进度和结果
        "real_time_monitoring": ["updates", "custom", "messages"]  # 实时监控:全部信息
    }

    return mode_recommendations.get(use_case, "updates")

# 使用示例
chat_mode = choose_stream_mode("chat_application")
debug_mode = choose_stream_mode("debugging")

2. 处理流式传输错误

def safe_stream_invoke(agent, input_data, stream_mode="updates", max_retries=3):
    """安全的流式调用"""
    for attempt in range(max_retries):
        try:
            for chunk in agent.stream(input_data, stream_mode=stream_mode):
                yield chunk
            break  # 成功完成,退出重试循环
        except Exception as e:
            print(f"流式传输错误 (尝试 {attempt + 1}/{max_retries}): {e}")
            if attempt == max_retries - 1:
                raise  # 最后一次尝试仍然失败,抛出异常
            time.sleep(1)  # 等待后重试

# 使用示例
for chunk in safe_stream_invoke(
    agent,
    {"messages": [{"role": "user", "content": "查询天气"}]},
    stream_mode="messages"
):
    print(chunk)

故障排除

常见问题及解决方案

  1. 流式传输不工作
  2. 检查模型是否支持流式传输
  3. 确认 stream_mode 参数设置正确
  4. 验证网络连接

  5. 自定义更新不显示

  6. 确保在工具中正确使用 get_stream_writer()
  7. 检查 stream_mode 包含 "custom"
  8. 确认在 LangGraph 执行上下文中调用

  9. 性能问题

  10. 减少不必要的流式更新
  11. 使用合适的批处理大小
  12. 考虑禁用某些流式模式

总结

LangChain 的流式传输系统提供了强大的实时更新能力:

  • 多种模式:代理进度、LLM tokens、自定义更新
  • 灵活组合:可以同时使用多种流式模式
  • 实际应用:适用于聊天、监控、数据处理等场景
  • 健壮性:包含错误处理和性能优化

通过合理使用流式传输,可以显著提升应用的响应性和用户体验。