1. CLI 工具概述

LangGraph CLI 是官方的命令行界面工具,提供创建、开发、构建和部署 LangGraph 应用程序的完整工具链。它简化了从项目初始化到生产部署的整个开发流程。

1.1 核心特性

  • 项目脚手架:基于模板快速创建新项目
  • 开发服务器:热重载和调试支持的开发环境
  • Docker 集成:容器化构建和部署
  • 配置管理:灵活的项目配置系统
  • 多语言支持:Python 和 JavaScript/TypeScript

1.2 架构设计

graph TB
    subgraph "CLI 命令层"
        NEW[langgraph new]
        DEV[langgraph dev]
        UP[langgraph up]
        BUILD[langgraph build]
        DOCKERFILE[langgraph dockerfile]
    end
    
    subgraph "核心模块层"
        CLI[cli.py]
        CONFIG[config.py]
        TEMPLATES[templates.py]
        DOCKER[docker.py]
        EXEC[exec.py]
    end
    
    subgraph "工具层"
        PROGRESS[progress.py]
        ANALYTICS[analytics.py]
        UTIL[util.py]
        VERSION[version.py]
    end
    
    subgraph "外部依赖"
        CLICK[Click CLI框架]
        DOCKER_ENGINE[Docker引擎]
        LANGGRAPH_SDK[LangGraph SDK]
    end
    
    NEW --> TEMPLATES
    DEV --> EXEC
    UP --> DOCKER
    BUILD --> DOCKER
    DOCKERFILE --> DOCKER
    
    TEMPLATES --> CONFIG
    EXEC --> CONFIG
    DOCKER --> CONFIG
    
    CONFIG --> UTIL
    CLI --> PROGRESS
    CLI --> ANALYTICS
    
    CLI --> CLICK
    DOCKER --> DOCKER_ENGINE
    EXEC --> LANGGRAPH_SDK
    
    style CLI fill:#f96,stroke:#333,stroke-width:2px
    style CONFIG fill:#69f,stroke:#333,stroke-width:2px

2. CLI 主程序架构

2.1 入口点定义

# langgraph_cli/cli.py

import click
from langgraph_cli.analytics import log_command
from langgraph_cli.version import __version__

@click.group()
@click.version_option(version=__version__, prog_name="LangGraph CLI")
def cli():
    """
    LangGraph CLI 主命令组
    
    提供统一的命令入口点和版本信息
    """
    pass

# 在 pyproject.toml 中定义的入口点
[project.scripts]
langgraph = "langgraph_cli.cli:cli"

2.2 命令装饰器系统

# 通用选项装饰器
OPT_CONFIG = click.option(
    "--config",
    "-c",
    type=click.Path(
        exists=True,
        file_okay=True,
        dir_okay=False,
        resolve_path=True,
        path_type=pathlib.Path,
    ),
    default="langgraph.json",
    help="配置文件路径(默认:langgraph.json)"
)

OPT_PORT = click.option(
    "--port",
    "-p",
    type=int,
    default=DEFAULT_PORT,
    show_default=True,
    help="要暴露的端口"
)

OPT_VERBOSE = click.option(
    "--verbose",
    is_flag=True,
    default=False,
    help="显示详细的服务器日志"
)

# 装饰器组合示例
@OPT_PORT
@OPT_CONFIG
@OPT_VERBOSE
@cli.command(help="🚀 启动 LangGraph API 服务器")
@log_command
def up(port: int, config: pathlib.Path, verbose: bool):
    """up 命令的实现"""
    pass

2.3 分析和日志系统

# langgraph_cli/analytics.py

import functools
import hashlib
import json
import os
import platform
from typing import Any, Callable, Dict, Optional

def log_command(func: Callable) -> Callable:
    """
    装饰器:记录命令执行分析数据
    
    收集信息:
    - 命令名称和参数
    - 系统环境信息
    - 执行时间和结果
    - 错误信息(如有)
    """
    
    @functools.wraps(func)
    def wrapper(*args: Any, **kwargs: Any) -> Any:
        command_name = func.__name__
        start_time = time.time()
        
        # 收集环境信息
        analytics_data = {
            "command": command_name,
            "cli_version": __version__,
            "python_version": platform.python_version(),
            "platform": platform.system(),
            "platform_version": platform.release(),
            "timestamp": datetime.utcnow().isoformat(),
        }
        
        # 匿名化用户信息
        user_id = _get_anonymous_user_id()
        analytics_data["user_id"] = user_id
        
        try:
            # 执行命令
            result = func(*args, **kwargs)
            
            # 记录成功执行
            analytics_data["success"] = True
            analytics_data["duration"] = time.time() - start_time
            
            return result
            
        except Exception as e:
            # 记录执行错误
            analytics_data["success"] = False
            analytics_data["error_type"] = type(e).__name__
            analytics_data["error_message"] = str(e)
            analytics_data["duration"] = time.time() - start_time
            
            raise
            
        finally:
            # 发送分析数据(异步,不阻塞用户)
            _send_analytics(analytics_data)
    
    return wrapper

def _get_anonymous_user_id() -> str:
    """生成匿名用户ID"""
    # 基于机器特征生成稳定的匿名ID
    machine_id = platform.node() + platform.machine()
    return hashlib.sha256(machine_id.encode()).hexdigest()[:16]

def _send_analytics(data: Dict[str, Any]) -> None:
    """异步发送分析数据"""
    if not _should_send_analytics():
        return
    
    # 在后台线程发送,避免影响用户体验
    threading.Thread(
        target=_send_analytics_sync,
        args=(data,),
        daemon=True
    ).start()

def _should_send_analytics() -> bool:
    """检查是否应该发送分析数据"""
    # 检查用户设置和环境变量
    return not os.getenv("LANGGRAPH_CLI_NO_ANALYTICS", "").lower() in ("1", "true", "yes")

3. 配置管理系统

3.1 配置文件结构

LangGraph CLI 使用 langgraph.json 作为主要配置文件:

# langgraph_cli/config.py

class Config(TypedDict):
    """LangGraph 项目配置结构"""
    
    # 必需字段
    dependencies: list[str]              # 项目依赖
    graphs: dict[str, str]               # 图定义映射
    
    # 可选字段
    python_version: Optional[str]        # Python版本要求
    node_version: Optional[str]          # Node.js版本要求
    dockerfile_lines: Optional[list[str]]  # 自定义Dockerfile行
    env: Optional[dict[str, str]]        # 环境变量
    
    # 高级配置
    store: Optional[StoreConfig]         # 存储配置
    middleware: Optional[dict[str, Any]] # 中间件配置
    api_version: Optional[str]           # API版本
    base_image: Optional[str]            # 基础镜像
    distro: Optional[Distros]            # Linux发行版选择

class StoreConfig(TypedDict, total=False):
    """存储配置"""
    
    base: Optional[str]                  # 存储后端类型
    embed: Optional[str]                 # 嵌入模型
    index: Optional[IndexConfig]         # 索引配置
    ttl: Optional[TTLConfig]             # TTL配置

class IndexConfig(TypedDict, total=False):
    """索引配置"""
    
    dims: int                            # 向量维度
    embed: str                           # 嵌入函数
    fields: Optional[list[str]]          # 索引字段

class TTLConfig(TypedDict, total=False):
    """TTL配置"""
    
    refresh_on_read: bool                # 读取时刷新TTL
    default_ttl: Optional[float]         # 默认TTL(分钟)
    sweep_interval_minutes: Optional[int]  # 清理间隔

3.2 配置加载和验证

def load_config(
    config_path: pathlib.Path = pathlib.Path("langgraph.json")
) -> Config:
    """
    加载和验证配置文件
    
    验证步骤:
    1. 文件存在性检查
    2. JSON格式验证
    3. 配置模式验证
    4. 依赖项验证
    5. 图定义验证
    """
    
    if not config_path.exists():
        raise click.ClickException(
            f"配置文件未找到:{config_path}\n"
            "运行 'langgraph new' 创建新项目。"
        )
    
    try:
        with open(config_path, "r", encoding="utf-8") as f:
            config_data = json.load(f)
    except json.JSONDecodeError as e:
        raise click.ClickException(
            f"配置文件格式错误:{config_path}\n"
            f"JSON解析错误:{e}"
        )
    
    # 验证必需字段
    if "dependencies" not in config_data:
        raise click.ClickException(
            "配置文件缺少必需字段 'dependencies'"
        )
    
    if "graphs" not in config_data:
        raise click.ClickException(
            "配置文件缺少必需字段 'graphs'"
        )
    
    # 验证图定义
    for graph_name, graph_path in config_data["graphs"].items():
        if not _validate_graph_path(graph_path):
            raise click.ClickException(
                f"无效的图路径:{graph_path} (图名:{graph_name})"
            )
    
    # 应用默认值
    config = _apply_defaults(config_data)
    
    return config

def _validate_graph_path(graph_path: str) -> bool:
    """验证图路径格式"""
    # 支持的格式:
    # - "module.py:graph_variable"
    # - "package.module:function"
    # - "relative/path/to/file.py:variable"
    
    if ":" not in graph_path:
        return False
    
    module_path, graph_name = graph_path.rsplit(":", 1)
    
    # 检查模块路径
    if module_path.endswith(".py"):
        # 文件路径
        return pathlib.Path(module_path).exists()
    else:
        # Python模块路径
        return _is_valid_python_identifier(module_path.replace(".", "_"))

def _apply_defaults(config_data: dict) -> Config:
    """应用默认配置值"""
    
    defaults = {
        "python_version": DEFAULT_PYTHON_VERSION,
        "node_version": DEFAULT_NODE_VERSION,
        "distro": DEFAULT_IMAGE_DISTRO,
        "env": {},
        "dockerfile_lines": [],
    }
    
    for key, default_value in defaults.items():
        if key not in config_data:
            config_data[key] = default_value
    
    return config_data

3.3 配置继承和覆盖

class ConfigManager:
    """
    配置管理器:处理多层配置继承和覆盖
    
    配置优先级(从高到低):
    1. 命令行参数
    2. 环境变量
    3. 项目配置文件
    4. 默认配置
    """
    
    def __init__(self, config_path: pathlib.Path):
        self.config_path = config_path
        self.base_config = load_config(config_path)
    
    def get_effective_config(
        self,
        cli_overrides: dict[str, Any] | None = None,
        env_prefix: str = "LANGGRAPH_"
    ) -> Config:
        """获取生效的配置"""
        
        # 1. 从基础配置开始
        effective_config = dict(self.base_config)
        
        # 2. 应用环境变量覆盖
        env_overrides = self._get_env_overrides(env_prefix)
        self._merge_config(effective_config, env_overrides)
        
        # 3. 应用CLI参数覆盖
        if cli_overrides:
            self._merge_config(effective_config, cli_overrides)
        
        return effective_config
    
    def _get_env_overrides(self, prefix: str) -> dict[str, Any]:
        """从环境变量获取配置覆盖"""
        overrides = {}
        
        for env_name, env_value in os.environ.items():
            if env_name.startswith(prefix):
                # 转换环境变量名为配置键
                # LANGGRAPH_PYTHON_VERSION -> python_version
                config_key = env_name[len(prefix):].lower()
                
                # 类型转换
                parsed_value = self._parse_env_value(env_value, config_key)
                overrides[config_key] = parsed_value
        
        return overrides
    
    def _parse_env_value(self, value: str, key: str) -> Any:
        """解析环境变量值的类型"""
        
        # 布尔值
        if value.lower() in ("true", "false"):
            return value.lower() == "true"
        
        # 数字
        try:
            if "." in value:
                return float(value)
            else:
                return int(value)
        except ValueError:
            pass
        
        # JSON对象/数组
        if value.startswith(("{", "[")):
            try:
                return json.loads(value)
            except json.JSONDecodeError:
                pass
        
        # 字符串(默认)
        return value
    
    def _merge_config(self, base: dict, override: dict) -> None:
        """深度合并配置字典"""
        for key, value in override.items():
            if key in base and isinstance(base[key], dict) and isinstance(value, dict):
                self._merge_config(base[key], value)
            else:
                base[key] = value

4. 项目模板系统

4.1 模板定义和管理

# langgraph_cli/templates.py

TEMPLATES: dict[str, dict[str, str]] = {
    "New LangGraph Project": {
        "description": "一个简单的、最小的带内存的聊天机器人。",
        "python": "https://github.com/langchain-ai/new-langgraph-project/archive/refs/heads/main.zip",
        "js": "https://github.com/langchain-ai/new-langgraphjs-project/archive/refs/heads/main.zip",
    },
    "ReAct Agent": {
        "description": "一个简单的智能体,可以灵活扩展到许多工具。",
        "python": "https://github.com/langchain-ai/react-agent/archive/refs/heads/main.zip",
        "js": "https://github.com/langchain-ai/react-agent-js/archive/refs/heads/main.zip",
    },
    "Memory Agent": {
        "description": "具有额外工具的 ReAct 风格智能体,用于存储跨对话线程使用的记忆。",
        "python": "https://github.com/langchain-ai/memory-agent/archive/refs/heads/main.zip",
        "js": "https://github.com/langchain-ai/memory-agent-js/archive/refs/heads/main.zip",
    },
    "Retrieval Agent": {
        "description": "包含基于检索的问答系统的智能体。",
        "python": "https://github.com/langchain-ai/retrieval-agent-template/archive/refs/heads/main.zip",
        "js": "https://github.com/langchain-ai/retrieval-agent-template-js/archive/refs/heads/main.zip",
    },
    "Data-enrichment Agent": {
        "description": "执行网络搜索并将发现的信息组织成结构化格式的智能体。",
        "python": "https://github.com/langchain-ai/data-enrichment/archive/refs/heads/main.zip",
        "js": "https://github.com/langchain-ai/data-enrichment-js/archive/refs/heads/main.zip",
    },
}

# 生成模板ID映射
TEMPLATE_ID_TO_CONFIG = {
    f"{name.lower().replace(' ', '-')}-{lang}": (name, lang, url)
    for name, versions in TEMPLATES.items()
    for lang, url in versions.items()
    if lang in {"python", "js"}
}

4.2 模板选择和下载

def _choose_template() -> str:
    """
    向用户展示模板列表并提示选择
    
    返回值:选中模板的URL
    """
    click.secho("🌟 请选择一个模板:", bold=True, fg="yellow")
    
    for idx, (template_name, template_info) in enumerate(TEMPLATES.items(), 1):
        click.secho(f"{idx}. ", nl=False, fg="cyan")
        click.secho(template_name, fg="cyan", nl=False)
        click.secho(f" - {template_info['description']}", fg="white")
    
    # 获取用户选择,默认为第一个模板
    template_choice: Optional[int] = click.prompt(
        "输入模板选择的数字(默认为1)",
        type=int,
        default=1,
        show_default=False,
    )
    
    template_keys = list(TEMPLATES.keys())
    if 1 <= template_choice <= len(template_keys):
        selected_template: str = template_keys[template_choice - 1]
    else:
        click.secho("❌ 无效选择。请重试。", fg="red")
        return _choose_template()
    
    # 选择编程语言
    template_info = TEMPLATES[selected_template]
    if len(template_info) > 2:  # 有描述 + 多种语言
        languages = [k for k in template_info.keys() if k != "description"]
        
        click.secho(f"\n🔧 为 '{selected_template}' 选择编程语言:", bold=True, fg="yellow")
        for idx, lang in enumerate(languages, 1):
            click.secho(f"{idx}. {lang.title()}", fg="cyan")
        
        lang_choice = click.prompt(
            "输入语言选择的数字(默认为1)",
            type=int,
            default=1,
        )
        
        if 1 <= lang_choice <= len(languages):
            selected_lang = languages[lang_choice - 1]
            return template_info[selected_lang]
        else:
            click.secho("❌ 无效选择。使用默认语言。", fg="red")
            return template_info[languages[0]]
    
    return template_info["python"]  # 默认Python

def _download_and_extract_template(url: str, target_path: str) -> bool:
    """
    下载并解压模板
    
    参数:
    - url: 模板ZIP文件URL
    - target_path: 目标目录路径
    
    返回值:是否成功
    """
    try:
        click.secho(f"📥 正在下载模板从 {url}...", fg="blue")
        
        # 下载ZIP文件到内存
        with request.urlopen(url) as response:
            if response.status != 200:
                click.secho(f"❌ 下载失败:HTTP {response.status}", fg="red")
                return False
            
            zip_data = BytesIO(response.read())
        
        # 解压到目标目录
        with ZipFile(zip_data) as zip_file:
            # 获取根目录名(通常是仓库名-分支名)
            root_dir = zip_file.namelist()[0].split('/')[0]
            
            click.secho(f"📦 正在解压模板到 {target_path}...", fg="blue")
            
            for member in zip_file.namelist():
                if member.startswith(root_dir + '/'):
                    # 移除根目录前缀
                    relative_path = member[len(root_dir) + 1:]
                    if not relative_path:  # 跳过根目录本身
                        continue
                    
                    target_file = os.path.join(target_path, relative_path)
                    
                    if member.endswith('/'):
                        # 创建目录
                        os.makedirs(target_file, exist_ok=True)
                    else:
                        # 创建文件
                        os.makedirs(os.path.dirname(target_file), exist_ok=True)
                        with zip_file.open(member) as source:
                            with open(target_file, 'wb') as target:
                                shutil.copyfileobj(source, target)
        
        click.secho("✅ 模板下载和解压成功!", fg="green")
        return True
        
    except Exception as e:
        click.secho(f"❌ 模板下载失败:{e}", fg="red")
        return False

4.3 项目初始化

def create_new(path: Optional[str], template: Optional[str]) -> None:
    """
    创建新的LangGraph项目
    
    参数:
    - path: 项目路径(可选)
    - template: 模板ID(可选)
    """
    
    # 1. 确定项目路径
    if path is None:
        path = click.prompt(
            "📁 输入项目路径",
            default="./my-langgraph-project",
            show_default=True,
        )
    
    project_path = os.path.abspath(path)
    
    # 检查目录是否存在
    if os.path.exists(project_path) and os.listdir(project_path):
        if not click.confirm(
            f"目录 '{project_path}' 不为空。继续?",
            default=False
        ):
            click.secho("❌ 项目创建已取消。", fg="yellow")
            return
    
    # 2. 选择或确定模板
    if template is None:
        template_url = _choose_template()
    else:
        if template in TEMPLATE_ID_TO_CONFIG:
            _, _, template_url = TEMPLATE_ID_TO_CONFIG[template]
        else:
            click.secho(f"❌ 未知模板:{template}", fg="red")
            click.secho("可用模板:", fg="yellow")
            for template_id in TEMPLATE_ID_TO_CONFIG:
                click.secho(f"  - {template_id}", fg="cyan")
            return
    
    # 3. 创建项目目录
    os.makedirs(project_path, exist_ok=True)
    
    # 4. 下载和解压模板
    if not _download_and_extract_template(template_url, project_path):
        click.secho("❌ 项目创建失败。", fg="red")
        return
    
    # 5. 后处理:更新配置文件中的项目名称等
    _post_process_template(project_path, os.path.basename(project_path))
    
    # 6. 显示成功消息和后续步骤
    click.secho("🎉 项目创建成功!", bold=True, fg="green")
    click.secho(f"📁 项目位置:{project_path}", fg="blue")
    click.secho("\n📖 后续步骤:", bold=True, fg="yellow")
    click.secho(f"   cd {os.path.basename(project_path)}", fg="cyan")
    click.secho("   langgraph dev", fg="cyan")

def _post_process_template(project_path: str, project_name: str) -> None:
    """模板后处理:更新项目特定信息"""
    
    # 更新 langgraph.json 中的项目名称
    config_path = os.path.join(project_path, "langgraph.json")
    if os.path.exists(config_path):
        try:
            with open(config_path, 'r') as f:
                config = json.load(f)
            
            # 可以在这里添加项目特定的配置更新
            # 例如:更新图名称、添加默认环境变量等
            
            with open(config_path, 'w') as f:
                json.dump(config, f, indent=2)
                
        except Exception as e:
            click.secho(f"⚠️  配置文件更新警告:{e}", fg="yellow")
    
    # 更新 README.md 中的项目名称
    readme_path = os.path.join(project_path, "README.md")
    if os.path.exists(readme_path):
        try:
            with open(readme_path, 'r') as f:
                content = f.read()
            
            # 替换项目名称占位符
            content = content.replace("{{project_name}}", project_name)
            
            with open(readme_path, 'w') as f:
                f.write(content)
                
        except Exception as e:
            click.secho(f"⚠️  README更新警告:{e}", fg="yellow")

5. 开发服务器 (dev 命令)

5.1 开发服务器实现

@click.option("--host", default="127.0.0.1", help="网络接口绑定地址")
@click.option("--port", default=2024, type=int, help="端口号")
@click.option("--no-reload", is_flag=True, help="禁用自动重载")
@click.option("--config", type=click.Path(exists=True), default="langgraph.json")
@click.option("--debug-port", default=None, type=int, help="远程调试端口")
@click.option("--no-browser", is_flag=True, help="跳过自动打开浏览器")
@click.option("--tunnel", is_flag=True, help="通过公共隧道暴露本地服务器")
@cli.command("dev", help="🏃‍♀️‍➡️ 运行开发模式的 LangGraph API 服务器")
@log_command
def dev(
    host: str,
    port: int,
    no_reload: bool,
    config: str,
    debug_port: Optional[int],
    no_browser: bool,
    tunnel: bool,
    **kwargs
):
    """
    开发服务器命令实现
    
    特性:
    1. 热重载:监视文件变化自动重启
    2. 调试支持:集成远程调试能力
    3. 自动浏览器:启动后自动打开浏览器
    4. 隧道支持:通过Cloudflare隧道暴露服务
    """
    
    # 1. 加载配置
    config_path = pathlib.Path(config)
    app_config = load_config(config_path)
    
    # 2. 验证开发环境
    _validate_dev_environment(app_config)
    
    # 3. 设置调试
    if debug_port:
        _setup_remote_debugging(debug_port, kwargs.get("wait_for_client", False))
    
    # 4. 启动开发服务器
    runner = DevServer(
        config_path=config_path,
        host=host,
        port=port,
        reload=not no_reload,
        debug_port=debug_port,
        tunnel=tunnel,
    )
    
    try:
        # 启动服务器
        server_url = runner.start()
        
        # 打开浏览器(如果需要)
        if not no_browser and server_url:
            _open_browser(server_url)
        
        # 等待服务器运行
        runner.wait()
        
    except KeyboardInterrupt:
        click.secho("\n🛑 开发服务器已停止", fg="yellow")
    except Exception as e:
        click.secho(f"❌ 开发服务器错误:{e}", fg="red")
        sys.exit(1)

class DevServer:
    """开发服务器管理器"""
    
    def __init__(
        self,
        config_path: pathlib.Path,
        host: str = "127.0.0.1",
        port: int = 2024,
        reload: bool = True,
        debug_port: Optional[int] = None,
        tunnel: bool = False,
    ):
        self.config_path = config_path
        self.host = host
        self.port = port
        self.reload = reload
        self.debug_port = debug_port
        self.tunnel = tunnel
        
        self.process = None
        self.tunnel_process = None
        self.file_watcher = None
    
    def start(self) -> str:
        """启动开发服务器"""
        
        # 1. 构建启动命令
        cmd = self._build_command()
        
        # 2. 启动主进程
        click.secho(f"🚀 启动开发服务器 {self.host}:{self.port}...", fg="green")
        self.process = subprocess.Popen(
            cmd,
            cwd=self.config_path.parent,
            env=self._build_env(),
        )
        
        # 3. 等待服务器就绪
        server_url = self._wait_for_server()
        
        # 4. 启动隧道(如果需要)
        if self.tunnel:
            tunnel_url = self._start_tunnel(server_url)
            return tunnel_url
        
        # 5. 启动文件监视器(如果启用重载)
        if self.reload:
            self._start_file_watcher()
        
        return server_url
    
    def _build_command(self) -> list[str]:
        """构建启动命令"""
        
        cmd = [
            sys.executable, "-m", "uvicorn",
            "langgraph_api.main:app",
            "--host", self.host,
            "--port", str(self.port),
        ]
        
        if self.reload:
            cmd.extend(["--reload", "--reload-dir", "."])
        
        if self.debug_port:
            # 添加调试参数
            cmd = [
                sys.executable, "-m", "debugpy",
                "--listen", f"0.0.0.0:{self.debug_port}",
            ] + cmd[1:]  # 移除原来的python
        
        return cmd
    
    def _build_env(self) -> dict[str, str]:
        """构建环境变量"""
        env = os.environ.copy()
        
        # 设置配置文件路径
        env["LANGGRAPH_CONFIG"] = str(self.config_path)
        
        # 设置开发模式
        env["LANGGRAPH_ENV"] = "development"
        
        return env
    
    def _wait_for_server(self, timeout: int = 30) -> str:
        """等待服务器启动"""
        server_url = f"http://{self.host}:{self.port}"
        
        for _ in range(timeout):
            try:
                response = requests.get(f"{server_url}/health", timeout=1)
                if response.status_code == 200:
                    click.secho(f"✅ 服务器已就绪:{server_url}", fg="green")
                    return server_url
            except requests.RequestException:
                pass
            
            time.sleep(1)
        
        raise RuntimeError("服务器启动超时")
    
    def _start_tunnel(self, server_url: str) -> str:
        """启动Cloudflare隧道"""
        try:
            # 使用cloudflared创建隧道
            cmd = ["cloudflared", "tunnel", "--url", server_url]
            self.tunnel_process = subprocess.Popen(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
            )
            
            # 解析隧道URL
            for line in iter(self.tunnel_process.stdout.readline, b''):
                line = line.decode().strip()
                if "trycloudflare.com" in line:
                    tunnel_url = line.split()[-1]
                    click.secho(f"🌐 隧道已建立:{tunnel_url}", fg="cyan")
                    return tunnel_url
            
            return server_url  # 回退到本地URL
            
        except FileNotFoundError:
            click.secho("⚠️  cloudflared 未找到,跳过隧道创建", fg="yellow")
            return server_url
    
    def _start_file_watcher(self):
        """启动文件监视器"""
        from watchdog.observers import Observer
        from watchdog.events import FileSystemEventHandler
        
        class ReloadHandler(FileSystemEventHandler):
            def __init__(self, server: DevServer):
                self.server = server
                self.debounce_timer = None
            
            def on_modified(self, event):
                if event.is_directory:
                    return
                
                # 过滤文件类型
                if not event.src_path.endswith(('.py', '.json', '.yaml', '.yml')):
                    return
                
                # 防抖动重启
                if self.debounce_timer:
                    self.debounce_timer.cancel()
                
                self.debounce_timer = threading.Timer(1.0, self.server._restart)
                self.debounce_timer.start()
        
        self.file_watcher = Observer()
        self.file_watcher.schedule(
            ReloadHandler(self),
            str(self.config_path.parent),
            recursive=True
        )
        self.file_watcher.start()
    
    def _restart(self):
        """重启服务器"""
        click.secho("🔄 检测到文件变化,重启服务器...", fg="blue")
        
        if self.process:
            self.process.terminate()
            self.process.wait()
        
        # 重新启动
        self.start()
    
    def wait(self):
        """等待服务器运行"""
        if self.process:
            self.process.wait()
    
    def stop(self):
        """停止服务器"""
        if self.process:
            self.process.terminate()
            self.process.wait()
        
        if self.tunnel_process:
            self.tunnel_process.terminate()
            self.tunnel_process.wait()
        
        if self.file_watcher:
            self.file_watcher.stop()
            self.file_watcher.join()

6. Docker 集成 (up/build 命令)

6.1 Docker 能力检测

# langgraph_cli/docker.py

class DockerCapabilities(NamedTuple):
    """Docker能力检测结果"""
    
    has_docker: bool                 # 是否安装Docker
    has_compose: bool                # 是否支持Docker Compose
    buildx_available: bool           # 是否支持Buildx
    version: Optional[str]           # Docker版本
    compose_version: Optional[str]   # Compose版本

def detect_docker_capabilities() -> DockerCapabilities:
    """
    检测Docker环境能力
    
    检测项目:
    1. Docker引擎安装状态
    2. Docker Compose支持
    3. Buildx多平台构建支持
    4. 版本信息
    """
    
    has_docker = False
    has_compose = False
    buildx_available = False
    docker_version = None
    compose_version = None
    
    try:
        # 检测Docker
        result = subprocess.run(
            ["docker", "--version"],
            capture_output=True,
            text=True,
            timeout=10,
        )
        if result.returncode == 0:
            has_docker = True
            docker_version = result.stdout.strip()
        
        # 检测Docker Compose
        result = subprocess.run(
            ["docker", "compose", "version"],
            capture_output=True,
            text=True,
            timeout=10,
        )
        if result.returncode == 0:
            has_compose = True
            compose_version = result.stdout.strip()
        
        # 检测Buildx
        result = subprocess.run(
            ["docker", "buildx", "version"],
            capture_output=True,
            text=True,
            timeout=10,
        )
        if result.returncode == 0:
            buildx_available = True
            
    except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
        pass
    
    return DockerCapabilities(
        has_docker=has_docker,
        has_compose=has_compose,
        buildx_available=buildx_available,
        version=docker_version,
        compose_version=compose_version,
    )

6.2 Dockerfile 生成

def generate_dockerfile(
    config: Config,
    config_path: pathlib.Path,
    target_path: pathlib.Path,
) -> None:
    """
    生成优化的Dockerfile
    
    生成策略:
    1. 选择合适的基础镜像
    2. 多阶段构建优化
    3. 依赖缓存优化
    4. 安全和性能考虑
    """
    
    # 1. 确定基础镜像
    base_image = _determine_base_image(config)
    
    # 2. 构建Dockerfile内容
    dockerfile_lines = [
        f"FROM {base_image} as builder",
        "",
        "# 设置工作目录",
        "WORKDIR /app",
        "",
        "# 复制依赖文件",
    ]
    
    # 3. 处理Python项目
    if _is_python_project(config):
        dockerfile_lines.extend([
            "COPY requirements.txt* pyproject.toml* poetry.lock* ./",
            "",
            "# 安装Python依赖",
            "RUN pip install --no-cache-dir --upgrade pip",
        ])
        
        # 根据依赖文件类型选择安装命令
        if (config_path.parent / "pyproject.toml").exists():
            dockerfile_lines.append("RUN pip install -e .")
        elif (config_path.parent / "requirements.txt").exists():
            dockerfile_lines.append("RUN pip install -r requirements.txt")
    
    # 4. 处理JavaScript/TypeScript项目
    elif _is_js_project(config):
        dockerfile_lines.extend([
            "COPY package.json yarn.lock* package-lock.json* ./",
            "",
            "# 安装Node.js依赖",
            "RUN npm ci --only=production || yarn install --frozen-lockfile --production",
        ])
    
    # 5. 添加应用代码
    dockerfile_lines.extend([
        "",
        "# 复制应用代码",
        "COPY . .",
        "",
        "# 运行时阶段",
        f"FROM {base_image}",
        "",
        "WORKDIR /app",
        "",
        "# 复制构建结果",
        "COPY --from=builder /app /app",
        "",
        "# 设置环境变量",
        "ENV PYTHONPATH=/app",
        "ENV LANGGRAPH_ENV=production",
        "",
    ])
    
    # 6. 添加自定义Dockerfile行
    custom_lines = config.get("dockerfile_lines", [])
    if custom_lines:
        dockerfile_lines.extend([
            "# 自定义配置",
            *custom_lines,
            "",
        ])
    
    # 7. 添加启动命令
    dockerfile_lines.extend([
        "# 暴露端口",
        "EXPOSE 8000",
        "",
        "# 健康检查",
        "HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \\",
        '  CMD curl -f http://localhost:8000/health || exit 1',
        "",
        "# 启动命令",
        'CMD ["uvicorn", "langgraph_api.main:app", "--host", "0.0.0.0", "--port", "8000"]',
    ])
    
    # 8. 写入文件
    with open(target_path, 'w') as f:
        f.write('\n'.join(dockerfile_lines))
    
    click.secho(f"✅ Dockerfile已生成:{target_path}", fg="green")

def _determine_base_image(config: Config) -> str:
    """确定最佳基础镜像"""
    
    # 优先级:配置指定 > 语言检测 > 默认
    if base_image := config.get("base_image"):
        return base_image
    
    # 检测项目类型和Python版本
    python_version = config.get("python_version", DEFAULT_PYTHON_VERSION)
    distro = config.get("distro", DEFAULT_IMAGE_DISTRO)
    
    if _is_python_project(config):
        if distro == "wolfi":
            return f"cgr.dev/chainguard/python:{python_version}"
        else:
            return f"python:{python_version}-slim"
    
    elif _is_js_project(config):
        node_version = config.get("node_version", DEFAULT_NODE_VERSION)
        if distro == "wolfi":
            return f"cgr.dev/chainguard/node:{node_version}"
        else:
            return f"node:{node_version}-slim"
    
    # 默认Python镜像
    return f"python:{python_version}-slim"

6.3 Docker Compose 生成

def generate_docker_compose(
    config: Config,
    config_path: pathlib.Path,
    port: int = 8123,
    postgres_uri: Optional[str] = None,
) -> dict[str, Any]:
    """
    生成Docker Compose配置
    
    服务组成:
    1. 主应用服务
    2. PostgreSQL数据库(可选)
    3. Redis缓存(可选)
    4. 调试器服务(开发时)
    """
    
    compose_config = {
        "version": "3.8",
        "services": {},
        "volumes": {},
        "networks": {
            "langgraph": {
                "driver": "bridge"
            }
        }
    }
    
    # 1. 主应用服务
    app_service = {
        "build": {
            "context": ".",
            "dockerfile": "Dockerfile",
        },
        "ports": [f"{port}:8000"],
        "environment": _build_environment(config),
        "volumes": [
            "./:/app:cached",  # 开发时挂载源码
        ],
        "networks": ["langgraph"],
        "depends_on": [],
        "restart": "unless-stopped",
        "healthcheck": {
            "test": ["CMD", "curl", "-f", "http://localhost:8000/health"],
            "interval": "30s",
            "timeout": "10s",
            "retries": 3,
        }
    }
    
    # 2. PostgreSQL服务
    if not postgres_uri:
        postgres_service = {
            "image": "postgres:15",
            "environment": {
                "POSTGRES_DB": "langgraph",
                "POSTGRES_USER": "langgraph",
                "POSTGRES_PASSWORD": "langgraph",
            },
            "volumes": [
                "postgres_data:/var/lib/postgresql/data",
            ],
            "networks": ["langgraph"],
            "ports": ["5432:5432"],  # 开发时暴露端口
            "restart": "unless-stopped",
            "healthcheck": {
                "test": ["CMD-SHELL", "pg_isready -U langgraph"],
                "interval": "10s",
                "timeout": "5s",
                "retries": 5,
            }
        }
        
        compose_config["services"]["postgres"] = postgres_service
        compose_config["volumes"]["postgres_data"] = None
        app_service["depends_on"].append("postgres")
        app_service["environment"]["POSTGRES_URI"] = "postgresql://langgraph:langgraph@postgres:5432/langgraph"
    else:
        app_service["environment"]["POSTGRES_URI"] = postgres_uri
    
    # 3. Redis服务(如果配置需要)
    if _needs_redis(config):
        redis_service = {
            "image": "redis:7-alpine",
            "networks": ["langgraph"],
            "ports": ["6379:6379"],
            "restart": "unless-stopped",
            "healthcheck": {
                "test": ["CMD", "redis-cli", "ping"],
                "interval": "10s",
                "timeout": "3s",
                "retries": 3,
            }
        }
        
        compose_config["services"]["redis"] = redis_service
        app_service["depends_on"].append("redis")
        app_service["environment"]["REDIS_URI"] = "redis://redis:6379"
    
    compose_config["services"]["app"] = app_service
    
    return compose_config

def _build_environment(config: Config) -> dict[str, str]:
    """构建环境变量"""
    
    env = {
        "LANGGRAPH_ENV": "production",
        "PYTHONPATH": "/app",
    }
    
    # 添加配置中的环境变量
    if config_env := config.get("env"):
        env.update(config_env)
    
    return env

7. 进度显示和用户体验

7.1 进度条系统

# langgraph_cli/progress.py

import threading
import time
from typing import Optional

class Progress:
    """
    进度显示器:提供各种进度反馈形式
    
    支持的显示类型:
    1. 旋转器(spinner):未知进度的任务
    2. 进度条:已知进度的任务
    3. 多步骤:复杂任务的步骤进度
    """
    
    def __init__(
        self,
        description: str,
        total: Optional[int] = None,
        spinner_style: str = "dots",
    ):
        self.description = description
        self.total = total
        self.current = 0
        self.spinner_style = spinner_style
        
        self.running = False
        self.thread: Optional[threading.Thread] = None
        self.start_time: Optional[float] = None
        
        # 旋转器字符
        self.spinners = {
            "dots": "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏",
            "line": "-\\|/",
            "arrows": "←↖↑↗→↘↓↙",
            "bouncing": "⠁⠂⠄⠁⠂⠄",
        }
    
    def __enter__(self):
        self.start()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.stop()
        if exc_type is None:
            self._print_success()
        else:
            self._print_error()
    
    def start(self):
        """开始进度显示"""
        self.running = True
        self.start_time = time.time()
        
        if self.total is None:
            # 未知进度,显示旋转器
            self.thread = threading.Thread(target=self._spinner_worker)
        else:
            # 已知进度,显示进度条
            self.thread = threading.Thread(target=self._progress_worker)
        
        self.thread.daemon = True
        self.thread.start()
    
    def stop(self):
        """停止进度显示"""
        self.running = False
        if self.thread:
            self.thread.join(timeout=1)
        
        # 清除当前行
        print("\r" + " " * 80 + "\r", end="", flush=True)
    
    def update(self, increment: int = 1, description: Optional[str] = None):
        """更新进度"""
        self.current += increment
        if description:
            self.description = description
    
    def set_total(self, total: int):
        """设置总进度(动态)"""
        self.total = total
        
        # 如果正在运行旋转器,切换到进度条
        if self.running and self.total is not None:
            self.stop()
            self.start()
    
    def _spinner_worker(self):
        """旋转器工作线程"""
        spinner_chars = self.spinners.get(self.spinner_style, self.spinners["dots"])
        i = 0
        
        while self.running:
            char = spinner_chars[i % len(spinner_chars)]
            elapsed = time.time() - self.start_time if self.start_time else 0
            
            # 格式化输出
            output = f"\r{char} {self.description} ({elapsed:.1f}s)"
            print(output, end="", flush=True)
            
            time.sleep(0.1)
            i += 1
    
    def _progress_worker(self):
        """进度条工作线程"""
        while self.running:
            if self.total and self.total > 0:
                percent = min(100, (self.current / self.total) * 100)
                bar_length = 30
                filled = int(bar_length * percent / 100)
                bar = "█" * filled + "░" * (bar_length - filled)
                
                elapsed = time.time() - self.start_time if self.start_time else 0
                
                output = (
                    f"\r{self.description} "
                    f"[{bar}] {percent:.1f}% "
                    f"({self.current}/{self.total}) "
                    f"{elapsed:.1f}s"
                )
                print(output, end="", flush=True)
            
            time.sleep(0.1)
    
    def _print_success(self):
        """打印成功消息"""
        elapsed = time.time() - self.start_time if self.start_time else 0
        click.secho(f"✅ {self.description} 完成 ({elapsed:.1f}s)", fg="green")
    
    def _print_error(self):
        """打印错误消息"""
        elapsed = time.time() - self.start_time if self.start_time else 0
        click.secho(f"❌ {self.description} 失败 ({elapsed:.1f}s)", fg="red")

class MultiStepProgress:
    """多步骤进度显示器"""
    
    def __init__(self, steps: list[str]):
        self.steps = steps
        self.current_step = 0
        self.step_progress = {}
    
    def start_step(self, step_index: int) -> Progress:
        """开始执行某个步骤"""
        self.current_step = step_index
        step_name = self.steps[step_index]
        
        # 显示整体进度
        click.secho(
            f"📋 步骤 {step_index + 1}/{len(self.steps)}: {step_name}",
            fg="blue"
        )
        
        progress = Progress(step_name)
        self.step_progress[step_index] = progress
        return progress
    
    def complete_step(self, step_index: int, success: bool = True):
        """完成某个步骤"""
        step_name = self.steps[step_index]
        
        if success:
            click.secho(f"  ✅ {step_name}", fg="green")
        else:
            click.secho(f"  ❌ {step_name}", fg="red")
    
    def summary(self):
        """显示执行摘要"""
        completed = sum(1 for p in self.step_progress.values() if p.current > 0)
        
        click.secho(f"\n📊 执行摘要:", bold=True, fg="yellow")
        click.secho(f"   完成步骤:{completed}/{len(self.steps)}", fg="cyan")
        
        for i, step in enumerate(self.steps):
            if i in self.step_progress:
                click.secho(f"   ✅ {step}", fg="green")
            else:
                click.secho(f"   ⏸️  {step}", fg="yellow")

# 使用示例
def example_with_progress():
    """带进度显示的示例函数"""
    
    # 单步骤进度
    with Progress("下载模板") as progress:
        # 模拟工作
        for i in range(5):
            time.sleep(1)
            progress.update(description=f"下载中... {i+1}/5")
    
    # 多步骤进度
    steps = ["解析配置", "构建Docker镜像", "启动服务", "健康检查"]
    multi_progress = MultiStepProgress(steps)
    
    for i, step in enumerate(steps):
        with multi_progress.start_step(i):
            # 模拟工作
            time.sleep(2)
        
        multi_progress.complete_step(i, success=True)
    
    multi_progress.summary()

7.2 错误处理和用户友好提示

def handle_cli_error(func: Callable) -> Callable:
    """CLI错误处理装饰器"""
    
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
            
        except click.ClickException:
            # Click异常直接重新抛出
            raise
            
        except subprocess.CalledProcessError as e:
            # 子进程错误
            click.secho("❌ 命令执行失败", fg="red", bold=True)
            click.secho(f"命令:{' '.join(e.cmd)}", fg="yellow")
            click.secho(f"返回码:{e.returncode}", fg="yellow")
            
            if e.stdout:
                click.secho("标准输出:", fg="blue")
                click.echo(e.stdout.decode() if isinstance(e.stdout, bytes) else e.stdout)
            
            if e.stderr:
                click.secho("错误输出:", fg="red")
                click.echo(e.stderr.decode() if isinstance(e.stderr, bytes) else e.stderr)
            
            # 提供解决建议
            _suggest_solution(e)
            sys.exit(e.returncode)
            
        except FileNotFoundError as e:
            click.secho("❌ 文件未找到", fg="red", bold=True)
            click.secho(f"文件路径:{e.filename}", fg="yellow")
            click.secho("💡 请检查文件路径是否正确", fg="blue")
            sys.exit(1)
            
        except PermissionError as e:
            click.secho("❌ 权限不足", fg="red", bold=True)
            click.secho(f"无法访问:{e.filename}", fg="yellow")
            click.secho("💡 请检查文件权限或使用sudo运行", fg="blue")
            sys.exit(1)
            
        except json.JSONDecodeError as e:
            click.secho("❌ JSON格式错误", fg="red", bold=True)
            click.secho(f"位置:行 {e.lineno}, 列 {e.colno}", fg="yellow")
            click.secho(f"错误:{e.msg}", fg="yellow")
            click.secho("💡 请检查JSON文件格式", fg="blue")
            sys.exit(1)
            
        except Exception as e:
            # 未预期的错误
            click.secho("❌ 未知错误", fg="red", bold=True)
            click.secho(f"错误类型:{type(e).__name__}", fg="yellow")
            click.secho(f"错误信息:{str(e)}", fg="yellow")
            
            if click.confirm("是否显示详细堆栈信息?", default=False):
                import traceback
                click.secho("\n🔍 详细堆栈信息:", fg="blue", bold=True)
                traceback.print_exc()
            
            click.secho("\n💡 请报告此问题:https://github.com/langchain-ai/langgraph/issues", fg="blue")
            sys.exit(1)
    
    return wrapper

def _suggest_solution(error: subprocess.CalledProcessError):
    """根据错误类型提供解决建议"""
    
    cmd = error.cmd[0] if error.cmd else ""
    
    suggestions = {
        "docker": [
            "确保Docker已安装并运行",
            "检查Docker权限:sudo usermod -aG docker $USER",
            "尝试重启Docker服务:sudo systemctl restart docker",
        ],
        "git": [
            "确保Git已安装:sudo apt install git",
            "检查网络连接",
            "验证Git配置:git config --list",
        ],
        "npm": [
            "确保Node.js已安装:node --version",
            "清理npm缓存:npm cache clean --force",
            "尝试使用yarn:yarn install",
        ],
        "pip": [
            "升级pip:python -m pip install --upgrade pip",
            "检查Python环境:python --version",
            "使用虚拟环境:python -m venv venv && source venv/bin/activate",
        ],
    }
    
    if cmd in suggestions:
        click.secho(f"\n💡 针对 {cmd} 的解决建议:", fg="blue", bold=True)
        for suggestion in suggestions[cmd]:
            click.secho(f"   • {suggestion}", fg="blue")

8. 实用工具和最佳实践

8.1 系统检查工具

# langgraph_cli/util.py

def validate_system_requirements():
    """验证系统要求"""
    
    issues = []
    warnings = []
    
    # 检查Python版本
    current_python = platform.python_version_tuple()
    min_python = tuple(MIN_PYTHON_VERSION.split('.'))
    
    if current_python < min_python:
        issues.append(
            f"Python版本过低: {'.'.join(current_python)} < {MIN_PYTHON_VERSION}"
        )
    
    # 检查Docker
    docker_caps = detect_docker_capabilities()
    if not docker_caps.has_docker:
        warnings.append("Docker未安装,无法使用up/build命令")
    
    # 检查磁盘空间
    disk_space = shutil.disk_usage('.').free
    if disk_space < 1024 * 1024 * 1024:  # 1GB
        warnings.append("磁盘空间不足1GB,可能影响构建过程")
    
    # 检查网络连接
    if not _check_internet_connection():
        warnings.append("网络连接异常,可能影响依赖下载")
    
    # 显示结果
    if issues:
        click.secho("❌ 系统要求检查失败:", fg="red", bold=True)
        for issue in issues:
            click.secho(f"   • {issue}", fg="red")
        return False
    
    if warnings:
        click.secho("⚠️  系统要求检查警告:", fg="yellow", bold=True)
        for warning in warnings:
            click.secho(f"   • {warning}", fg="yellow")
    
    click.secho("✅ 系统要求检查通过", fg="green")
    return True

def _check_internet_connection() -> bool:
    """检查网络连接"""
    try:
        import socket
        socket.create_connection(("8.8.8.8", 53), timeout=5)
        return True
    except OSError:
        return False

def warn_non_wolfi_distro():
    """警告非Wolfi发行版的安全风险"""
    click.secho(
        "⚠️  使用非Wolfi发行版可能存在安全风险,建议在生产环境使用Wolfi镜像",
        fg="yellow"
    )

9. 总结

LangGraph CLI 是一个功能完整、用户友好的命令行工具,提供了从项目创建到生产部署的完整工具链:

9.1 核心优势

  1. 完整的开发流程:涵盖项目创建、开发、构建、部署的全生命周期
  2. 丰富的模板系统:多种预定义模板快速启动项目
  3. 强大的开发服务器:热重载、调试、隧道等开发特性
  4. Docker深度集成:自动化容器构建和编排
  5. 友好的用户体验:进度显示、错误处理、智能建议

9.2 技术特点

  1. 模块化架构:清晰的模块划分便于维护和扩展
  2. 配置管理:灵活的多层配置系统
  3. 错误处理:完善的错误捕获和用户提示
  4. 跨平台支持:支持Windows、macOS、Linux
  5. 可扩展性:插件化的模板和配置系统

LangGraph CLI 大大简化了LangGraph应用的开发和部署流程,是构建生产级AI应用不可或缺的工具。