跳转至

LangChain Models

概述

大语言模型(LLMs)是能够像人类一样解释和生成文本的强大AI工具。它们足够通用,可以编写内容、翻译语言、总结和回答问题,而无需为每个任务进行专门训练。

除了文本生成,许多模型还支持:

  • 工具调用 - 调用外部工具并在响应中使用结果
  • 结构化输出 - 模型的响应被约束为遵循定义的格式
  • 多模态 - 处理和返回文本以外的数据,如图像、音频和视频
  • 推理 - 模型执行多步推理以得出结论

模型是Agent的推理引擎,驱动Agent的决策过程。您选择的模型的质量和能力直接影响Agent的可靠性和性能。

基础用法

1. 初始化模型

使用 init_chat_model(推荐)

import os
from langchain.chat_models import init_chat_model

# 设置API密钥
os.environ["OPENAI_API_KEY"] = "sk-..."

# 初始化模型
model = init_chat_model("openai:gpt-4o")

# 基本调用
response = model.invoke("为什么鹦鹉会说话?")
print(response.content)

使用模型类

from langchain_openai import ChatOpenAI

# 直接使用模型类
model = ChatOpenAI(
    model="gpt-4o",
    temperature=0.7,
    max_tokens=1000,
    timeout=30
)

response = model.invoke("解释量子计算")
print(response.content)

2. 支持的提供商

# Anthropic
from langchain_anthropic import ChatAnthropic
model = ChatAnthropic(model="claude-3-5-sonnet-20241022")

# Google Gemini
from langchain_google_genai import ChatGoogleGenerativeAI
model = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp")

# Azure OpenAI
from langchain_openai import AzureChatOpenAI
model = AzureChatOpenAI(
    azure_deployment="your-deployment-name",
    openai_api_version="2023-05-15"
)

参数配置

常用参数

model = init_chat_model(
    "openai:gpt-4o",
    # 核心参数
    temperature=0.7,      # 控制随机性 (0-1)
    max_tokens=1000,      # 最大输出长度
    timeout=30,           # 超时时间(秒)
    max_retries=3,        # 最大重试次数

    # 高级参数
    top_p=0.9,           # 核采样参数
    frequency_penalty=0.1, # 频率惩罚
    presence_penalty=0.1,  # 存在惩罚
)

response = model.invoke("写一个关于AI的短故事")

调用方式

1. 单次调用(Invoke)

# 单条消息
response = model.invoke("Python的主要特点是什么?")
print(response.content)

# 对话历史
messages = [
    {"role": "system", "content": "你是一个有帮助的助手。"},
    {"role": "user", "content": "教我Python"},
    {"role": "assistant", "content": "Python是一种高级编程语言..."},
    {"role": "user", "content": "它的主要应用领域是什么?"}
]

response = model.invoke(messages)
print(response.content)

2. 流式调用(Stream)

print("AI回复: ", end="", flush=True)

for chunk in model.stream("解释机器学习的基本概念"):
    if hasattr(chunk, 'content'):
        print(chunk.content, end="", flush=True)
print()  # 换行

# 或者累积完整的消息
full_response = None
for chunk in model.stream("天气如何影响心情?"):
    full_response = chunk if full_response is None else full_response + chunk

print(f"\n完整回复: {full_response.content}")

3. 批量调用(Batch)

# 基本批量处理
questions = [
    "什么是人工智能?",
    "解释深度学习",
    "机器学习的应用场景",
    "神经网络如何工作"
]

responses = model.batch(questions)
for i, response in enumerate(responses):
    print(f"问题 {i+1}: {response.content[:100]}...")

# 异步完成批量处理
print("按完成顺序输出:")
for response in model.batch_as_completed(questions):
    print(f"收到回复: {response.content[:50]}...")

# 控制并发数
responses = model.batch(
    questions,
    config={'max_concurrency': 2}  # 限制同时2个请求
)

工具调用

1. 绑定工具

from langchain.tools import tool

@tool
def get_weather(location: str) -> str:
    """获取指定位置的天气信息。"""
    return f"{location}的天气:晴朗,25°C"

@tool
def calculator(expression: str) -> str:
    """计算数学表达式。"""
    try:
        result = eval(expression)
        return f"{expression} = {result}"
    except:
        return "计算错误"

# 绑定工具到模型
model_with_tools = model.bind_tools([get_weather, calculator])

# 调用带工具的模型
response = model_with_tools.invoke("北京今天天气怎么样?然后计算 25 * 4")
print("工具调用:", response.tool_calls)

2. 工具执行循环

def execute_tool_calls(model, messages, tools):
    """执行工具调用循环"""
    # 模型生成工具调用
    ai_msg = model.invoke(messages)
    messages.append(ai_msg)

    # 执行所有工具调用
    for tool_call in ai_msg.tool_calls:
        tool_name = tool_call["name"]
        tool_args = tool_call["args"]

        # 找到对应的工具并执行
        for tool in tools:
            if tool.name == tool_name:
                result = tool.invoke(tool_args)
                messages.append({
                    "role": "tool",
                    "content": result,
                    "tool_call_id": tool_call["id"]
                })
                break

    # 获取最终回复
    final_response = model.invoke(messages)
    return final_response

# 使用示例
tools = [get_weather, calculator]
messages = [{"role": "user", "content": "北京天气如何?然后计算 15 + 27"}]
result = execute_tool_calls(model_with_tools, messages, tools)
print("最终回复:", result.content)

3. 高级工具功能

# 强制使用特定工具
forced_model = model.bind_tools(
    [get_weather], 
    tool_choice="get_weather"  # 强制使用天气工具
)

# 禁用并行工具调用
sequential_model = model.bind_tools(
    [get_weather, calculator],
    parallel_tool_calls=False  # 顺序执行工具
)

# 流式工具调用
print("流式工具调用:")
for chunk in model_with_tools.stream("查询北京和上海的天气"):
    if hasattr(chunk, 'tool_call_chunks') and chunk.tool_call_chunks:
        for tool_chunk in chunk.tool_call_chunks:
            if tool_chunk.get('name'):
                print(f"工具: {tool_chunk['name']}")
            if tool_chunk.get('args'):
                print(f"参数: {tool_chunk['args']}")

结构化输出

1. Pydantic 模型

from pydantic import BaseModel, Field
from typing import List

class Movie(BaseModel):
    """电影信息"""
    title: str = Field(description="电影标题")
    year: int = Field(description="上映年份")
    director: str = Field(description="导演")
    rating: float = Field(description="评分(0-10)")
    genres: List[str] = Field(description="类型列表")

class ProductReview(BaseModel):
    """产品评价"""
    product_name: str = Field(description="产品名称")
    rating: int = Field(description="评分(1-5)")
    pros: List[str] = Field(description="优点")
    cons: List[str] = Field(description="缺点")
    summary: str = Field(description="总结")

# 使用结构化输出
structured_model = model.with_structured_output(Movie)
response = structured_model.invoke("提供电影《盗梦空间》的详细信息")

print(f"标题: {response.title}")
print(f"年份: {response.year}")
print(f"导演: {response.director}")
print(f"评分: {response.rating}")

2. 包含原始响应

# 同时获取解析结果和原始消息
structured_model_with_raw = model.with_structured_output(
    Movie, 
    include_raw=True
)

result = structured_model_with_raw.invoke("描述电影《阿凡达》")

print("解析结果:", result.parsed)
print("原始消息:", result.raw)
print("解析错误:", result.parsing_error)

3. 复杂嵌套结构

from typing import Optional

class Actor(BaseModel):
    """演员信息"""
    name: str = Field(description="演员姓名")
    character: str = Field(description="扮演角色")

class MovieDetails(BaseModel):
    """详细电影信息"""
    title: str = Field(description="电影标题")
    year: int = Field(description="上映年份")
    director: str = Field(description="导演")
    cast: List[Actor] = Field(description="演员表")
    budget: Optional[float] = Field(description="预算(百万美元)")
    box_office: Optional[float] = Field(description="票房(百万美元)")

# 使用嵌套结构
detailed_model = model.with_structured_output(MovieDetails)
response = detailed_model.invoke("提供《泰坦尼克号》的完整信息")

print(f"电影: {response.title} ({response.year})")
print(f"导演: {response.director}")
print("主演:")
for actor in response.cast:
    print(f"  - {actor.name} 饰演 {actor.character}")

高级功能

1. 多模态处理

from langchain_core.messages import HumanMessage
import base64

# 处理图像(模拟)
def encode_image(image_path):
    """编码图像为base64"""
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode('utf-8')

# 创建多模态消息
multimodal_message = [
    {
        "type": "text",
        "text": "描述这张图片中的内容"
    },
    {
        "type": "image_url",
        "image_url": {
            "url": "data:image/jpeg;base64,..."  # 实际使用中替换为真实base64数据
        }
    }
]

# 支持多模态的模型调用
response = model.invoke(multimodal_message)
print("图像描述:", response.content)

2. 推理过程

# 流式推理过程
print("推理过程:")
for chunk in model.stream("为什么天空是蓝色的?"):
    # 检查推理块
    if hasattr(chunk, 'content_blocks'):
        for block in chunk.content_blocks:
            if block.get("type") == "reasoning" and block.get("reasoning"):
                print(f"推理: {block['reasoning']}")
            elif block.get("type") == "text" and block.get("text"):
                print(f"回答: {block['text']}")

# 获取完整推理
response = model.invoke("解释全球变暖的原因", reasoning_effort="high")
reasoning_blocks = [b for b in response.content_blocks if b.get("type") == "reasoning"]
if reasoning_blocks:
    print("完整推理过程:")
    for block in reasoning_blocks:
        print(block.get("reasoning", ""))

3. 本地模型

# 使用 Ollama 运行本地模型
from langchain_ollama import ChatOllama

model = ChatOllama(
    model="qwen3:1.7b",
    temperature=0.8
)

response = model.invoke("用中文解释机器学习")
print("本地模型回复:", response.content)

4. 速率限制

from langchain_core.rate_limiters import InMemoryRateLimiter

# 创建速率限制器
rate_limiter = InMemoryRateLimiter(
    requests_per_second=1,      # 每秒1个请求
    check_every_n_seconds=0.1,  # 每100ms检查一次
    max_bucket_size=5          # 最大突发请求数
)

model_with_limiter = init_chat_model(
    "openai:gpt-4o",
    rate_limiter=rate_limiter
)

# 受速率限制的调用
for i in range(3):
    response = model_with_limiter.invoke(f"问题 {i+1}: 什么是AI?")
    print(f"回复 {i+1}: {response.content[:50]}...")

5. 令牌使用统计

from langchain_core.callbacks import get_usage_metadata_callback

# 使用上下文管理器跟踪令牌使用
with get_usage_metadata_callback() as callback:
    response1 = model.invoke("解释神经网络")
    response2 = model.invoke("什么是深度学习")

    print("令牌使用统计:")
    print(callback.usage_metadata)

# 直接从响应获取令牌信息
response = model.invoke("写一个Python函数计算斐波那契数列")
if hasattr(response, 'response_metadata'):
    usage = response.response_metadata.get('token_usage', {})
    print(f"输入令牌: {usage.get('prompt_tokens', 'N/A')}")
    print(f"输出令牌: {usage.get('completion_tokens', 'N/A')}")
    print(f"总令牌: {usage.get('total_tokens', 'N/A')}")

6. 可配置模型

# 创建运行时可配置的模型
configurable_model = init_chat_model(
    temperature=0,
    configurable_fields=("model", "temperature", "max_tokens")
)

# 使用不同配置调用
response1 = configurable_model.invoke(
    "解释机器学习",
    config={"configurable": {"model": "gpt-4o", "temperature": 0.7}}
)

response2 = configurable_model.invoke(
    "写一首诗", 
    config={"configurable": {"model": "gpt-4o", "temperature": 0.9}}
)

print("技术解释:", response1.content[:100])
print("诗歌创作:", response2.content[:100])

实际应用场景

场景1:内容生成

class ContentGenerator:
    """内容生成器"""

    def __init__(self, model):
        self.model = model

    def generate_blog_post(self, topic: str, style: str = "informative") -> str:
        """生成博客文章"""
        prompt = f"""
        以{style}的风格写一篇关于{topic}的博客文章。
        要求:
        1. 标题吸引人
        2. 结构清晰(引言、正文、结论)
        3. 包含具体例子
        4. 字数800-1000字
        """

        response = self.model.invoke(prompt)
        return response.content

    def generate_social_media_post(self, topic: str, platform: str) -> str:
        """生成社交媒体帖子"""
        platform_formats = {
            "twitter": "280字符以内,使用话题标签",
            "linkedin": "专业风格,聚焦行业见解",
            "instagram": "轻松有趣,使用表情符号"
        }

        format_guide = platform_formats.get(platform, "简洁有力")
        prompt = f"为{platform}创建关于{topic}的帖子。要求:{format_guide}"

        response = self.model.invoke(prompt)
        return response.content

# 使用示例
generator = ContentGenerator(model)
blog_post = generator.generate_blog_post("人工智能的未来", "专业")
twitter_post = generator.generate_social_media_post("机器学习", "twitter")

print("博客文章:", blog_post[:200])
print("推特帖子:", twitter_post)

场景2:数据分析助手

import json
from typing import Dict, Any

class DataAnalysisAssistant:
    """数据分析助手"""

    def __init__(self, model):
        self.model = model
        # 配置结构化输出用于数据分析
        self.analysis_model = model.with_structured_output(DataAnalysisResult)

    def analyze_dataset(self, data_description: str, questions: List[str]) -> Dict[str, Any]:
        """分析数据集"""
        prompt = f"""
        数据集描述: {data_description}

        请分析这个数据集并回答以下问题:
        {chr(10).join(f'{i+1}. {q}' for i, q in enumerate(questions))}

        提供:
        - 关键洞察
        - 潜在模式
        - 建议的进一步分析
        """

        response = self.analysis_model.invoke(prompt)
        return response.dict()

    def generate_sql_query(self, requirement: str, schema: str) -> str:
        """生成SQL查询"""
        prompt = f"""
        数据库模式: {schema}

        需求: {requirement}

        请生成一个优化的SQL查询来满足这个需求。
        同时解释查询的逻辑。
        """

        response = self.model.invoke(prompt)
        return response.content

# 数据结构定义
class DataAnalysisResult(BaseModel):
    key_insights: List[str]
    patterns: List[str]
    recommendations: List[str]
    summary: str

# 使用示例
assistant = DataAnalysisAssistant(model)

schema = """
用户表(users): id, name, age, city, signup_date
订单表(orders): id, user_id, amount, order_date, status
"""

analysis = assistant.analyze_dataset(
    "电商平台的用户和订单数据",
    ["用户年龄分布如何?", "哪个城市的用户最活跃?", "订单趋势如何?"]
)

sql_query = assistant.generate_sql_query(
    "查询最近30天每个城市的订单总量",
    schema
)

print("分析结果:", analysis)
print("SQL查询:", sql_query)

场景3:代码助手

class CodeAssistant:
    """代码助手"""

    def __init__(self, model):
        self.model = model

    def explain_code(self, code: str, language: str = "python") -> str:
        """解释代码"""
        prompt = f"""
        请解释以下{language}代码:

        ```{language}
        {code}
        ```

        解释应该包括:
        1. 代码的功能
        2. 关键逻辑步骤
        3. 可能的改进建议
        """

        response = self.model.invoke(prompt)
        return response.content

    def debug_code(self, code: str, error: str, language: str = "python") -> str:
        """调试代码"""
        prompt = f"""
        请帮助调试以下{language}代码:

        ```{language}
        {code}
        ```

        错误信息: {error}

        请提供:
        1. 错误原因分析
        2. 修复建议
        3. 修复后的代码
        """

        response = self.model.invoke(prompt)
        return response.content

    def generate_test_cases(self, code: str, language: str = "python") -> str:
        """生成测试用例"""
        prompt = f"""
        为以下{language}代码生成测试用例:

        ```{language}
        {code}
        ```

        包括:
        1. 正常情况测试
        2. 边界情况测试  
        3. 错误情况测试
        """

        response = self.model.invoke(prompt)
        return response.content

# 使用示例
code_assistant = CodeAssistant(model)

sample_code = """
def fibonacci(n):
    if n <= 1:
        return n
    else:
        return fibonacci(n-1) + fibonacci(n-2)
"""

explanation = code_assistant.explain_code(sample_code)
test_cases = code_assistant.generate_test_cases(sample_code)

print("代码解释:", explanation)
print("测试用例:", test_cases)

最佳实践

1. 错误处理

from tenacity import retry, stop_after_attempt, wait_exponential
import asyncio

class RobustModelClient:
    """健壮的模型客户端"""

    def __init__(self, model):
        self.model = model

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10)
    )
    def invoke_with_retry(self, prompt: str, **kwargs):
        """带重试的调用"""
        try:
            return self.model.invoke(prompt, **kwargs)
        except Exception as e:
            print(f"调用失败: {e}, 进行重试...")
            raise

    def safe_batch_process(self, prompts: List[str], batch_size: int = 5):
        """安全的批量处理"""
        results = []

        for i in range(0, len(prompts), batch_size):
            batch = prompts[i:i + batch_size]
            try:
                batch_results = self.model.batch(
                    batch, 
                    config={'max_concurrency': 2}
                )
                results.extend(batch_results)
                print(f"完成批次 {i//batch_size + 1}")
            except Exception as e:
                print(f"批次 {i//batch_size + 1} 失败: {e}")
                # 可以在这里添加重试逻辑

        return results

# 使用示例
robust_client = RobustModelClient(model)

try:
    response = robust_client.invoke_with_retry(
        "解释量子力学", 
        temperature=0.7
    )
    print("成功获取响应")
except Exception as e:
    print(f"所有重试都失败了: {e}")

2. 性能优化

import time
from functools import lru_cache

class OptimizedModelHandler:
    """优化的模型处理器"""

    def __init__(self, model):
        self.model = model
        self.response_cache = {}

    @lru_cache(maxsize=100)
    def cached_invoke(self, prompt: str, temperature: float = 0.7) -> str:
        """带缓存的调用"""
        cache_key = hash(prompt + str(temperature))

        if cache_key in self.response_cache:
            return self.response_cache[cache_key]

        start_time = time.time()
        response = self.model.invoke(prompt, temperature=temperature)
        execution_time = time.time() - start_time

        self.response_cache[cache_key] = response.content
        print(f"新请求 - 耗时: {execution_time:.2f}s")

        return response.content

    def batch_optimized(self, prompts: List[str], **kwargs):
        """优化的批量处理"""
        # 去重
        unique_prompts = list(set(prompts))

        # 批量处理唯一提示
        unique_responses = self.model.batch(unique_prompts, **kwargs)

        # 构建响应映射
        response_map = {prompt: resp.content for prompt, resp in zip(unique_prompts, unique_responses)}

        # 按原始顺序返回
        return [response_map[prompt] for prompt in prompts]

# 使用示例
optimized_handler = OptimizedModelHandler(model)

# 重复请求会使用缓存
for i in range(3):
    result = optimized_handler.cached_invoke("什么是Python?")
    print(f"请求 {i+1}: {result[:50]}...")

3. 成本控制

class CostAwareModelClient:
    """成本感知的模型客户端"""

    def __init__(self, model, budget_limit: int = 1000000):  # 100万token限制
        self.model = model
        self.budget_limit = budget_limit
        self.tokens_used = 0
        self.requests_count = 0

    def track_usage(self, response):
        """跟踪令牌使用"""
        if hasattr(response, 'response_metadata'):
            usage = response.response_metadata.get('token_usage', {})
            tokens = usage.get('total_tokens', 0)
            self.tokens_used += tokens
            self.requests_count += 1

            print(f"本次使用: {tokens} tokens")
            print(f"累计使用: {self.tokens_used}/{self.budget_limit} tokens")

            if self.tokens_used >= self.budget_limit:
                print("警告: 接近预算限制!")

    def invoke_with_budget(self, prompt: str, **kwargs):
        """带预算控制的调用"""
        if self.tokens_used >= self.budget_limit:
            raise Exception("已超过预算限制")

        response = self.model.invoke(prompt, **kwargs)
        self.track_usage(response)
        return response

    def get_usage_stats(self):
        """获取使用统计"""
        return {
            'tokens_used': self.tokens_used,
            'requests_count': self.requests_count,
            'budget_remaining': self.budget_limit - self.tokens_used,
            'utilization_percentage': (self.tokens_used / self.budget_limit) * 100
        }

# 使用示例
cost_aware_client = CostAwareModelClient(model, budget_limit=5000)  # 5000token测试限制

try:
    response1 = cost_aware_client.invoke_with_budget("解释机器学习")
    response2 = cost_aware_client.invoke_with_budget("什么是深度学习")

    stats = cost_aware_client.get_usage_stats()
    print("使用统计:", stats)

except Exception as e:
    print(f"调用失败: {e}")

总结

LangChain Models 提供了强大而灵活的方式来使用各种大语言模型:

  • 多提供商支持:OpenAI、Anthropic、Google、Azure等
  • 多种调用方式:单次调用、流式调用、批量调用
  • 工具集成:绑定和执行外部工具
  • 结构化输出:确保响应格式符合预期
  • 高级功能:多模态、推理、本地部署等
  • 生产就绪:错误处理、性能优化、成本控制

通过合理使用这些功能,您可以构建出强大、可靠且成本效益高的AI应用系统。