概述

Dify的工作流模块(core/workflow/)是平台的可视化业务流程引擎,支持通过拖拽节点的方式构建复杂的AI应用流程。该模块实现了有向无环图(DAG)的执行引擎,支持20+种节点类型、并行处理、循环控制和条件分支,为用户提供强大的可视化编程能力。

蜂巢架构设计理念: 根据Dify 1.8.0的架构分析,Dify采用了**蜂巢架构(Beehive Architecture)**设计理念:

  • 模块独立性:每个功能模块如蜂巢中的独立单元,可单独升级或替换
  • 松耦合设计:模块间通过标准接口通信,降低依赖性
  • 水平扩展性:新节点类型和功能可无缝集成到现有架构中
  • 故障隔离:单个模块的故障不会影响整个系统的运行

事件驱动执行模式: 工作流执行采用事件驱动架构,通过TaskPipeline实现:

  • 异步事件处理:节点执行、状态变更、错误处理都通过事件机制传递
  • 流式处理支持:支持流式输出,提升用户体验
  • 状态持久化:执行状态实时保存,支持断点续传和故障恢复

本文将深入分析该模块的架构设计、执行机制和关键实现。

1. 工作流模块整体架构

1.1 核心架构图

graph TB subgraph "工作流引擎层" GraphEngine[GraphEngine 图执行引擎] WorkflowEntry[WorkflowEntry 工作流入口] ThreadPool[ThreadPool 线程池] end subgraph "图结构层" Graph[Graph 图结构] GraphEdge[GraphEdge 图边] GraphParallel[GraphParallel 并行分支] end subgraph "节点系统层" BaseNode[BaseNode 基础节点] subgraph "控制节点" StartNode[StartNode 开始节点] EndNode[EndNode 结束节点] IfElseNode[IfElseNode 条件分支] LoopNode[LoopNode 循环节点] IterationNode[IterationNode 迭代节点] end subgraph "处理节点" LLMNode[LLMNode 语言模型节点] ToolNode[ToolNode 工具节点] CodeNode[CodeNode 代码执行节点] KnowledgeRetrievalNode[KnowledgeRetrievalNode 知识检索节点] HTTPRequestNode[HTTPRequestNode HTTP请求节点] end subgraph "数据节点" VariableAssignerNode[VariableAssignerNode 变量赋值节点] VariableAggregatorNode[VariableAggregatorNode 变量聚合节点] TemplateTransformNode[TemplateTransformNode 模板转换节点] ParameterExtractorNode[ParameterExtractorNode 参数提取节点] end subgraph "输出节点" AnswerNode[AnswerNode 答案节点] QuestionClassifierNode[QuestionClassifierNode 问题分类节点] DocumentExtractorNode[DocumentExtractorNode 文档提取节点] ListOperatorNode[ListOperatorNode 列表操作节点] end end subgraph "状态管理层" GraphRuntimeState[GraphRuntimeState 图运行时状态] VariablePool[VariablePool 变量池] NodeExecutionState[NodeExecutionState 节点执行状态] end subgraph "事件系统层" EventBus[EventBus 事件总线] GraphEngineEvent[GraphEngineEvent 图引擎事件] NodeEvent[NodeEvent 节点事件] StreamProcessor[StreamProcessor 流处理器] end WorkflowEntry --> GraphEngine GraphEngine --> Graph GraphEngine --> ThreadPool Graph --> GraphEdge Graph --> GraphParallel BaseNode --> StartNode BaseNode --> LLMNode BaseNode --> ToolNode BaseNode --> AnswerNode GraphEngine --> GraphRuntimeState GraphRuntimeState --> VariablePool GraphRuntimeState --> NodeExecutionState GraphEngine --> EventBus EventBus --> GraphEngineEvent EventBus --> NodeEvent style GraphEngine fill:#e3f2fd style BaseNode fill:#e8f5e8 style GraphRuntimeState fill:#fff3e0 style EventBus fill:#fce4ec

1.2 工作流执行流程

sequenceDiagram participant Client as 客户端 participant WorkflowEntry as 工作流入口 participant GraphEngine as 图执行引擎 participant NodeFactory as 节点工厂 participant BaseNode as 节点实例 participant VariablePool as 变量池 participant EventBus as 事件总线 Note over Client,EventBus: 工作流执行完整流程 Client->>WorkflowEntry: 启动工作流执行 WorkflowEntry->>GraphEngine: 初始化图执行引擎 GraphEngine->>GraphEngine: 加载工作流图结构 loop 节点执行循环 GraphEngine->>NodeFactory: 创建节点实例 NodeFactory->>BaseNode: 实例化具体节点 GraphEngine->>VariablePool: 获取节点输入变量 VariablePool-->>GraphEngine: 返回变量值 GraphEngine->>BaseNode: 执行节点逻辑 BaseNode->>BaseNode: 处理节点业务逻辑 alt 节点执行成功 BaseNode-->>GraphEngine: 返回执行结果 GraphEngine->>VariablePool: 更新输出变量 GraphEngine->>EventBus: 发布成功事件 else 节点执行失败 BaseNode-->>GraphEngine: 返回错误信息 GraphEngine->>EventBus: 发布失败事件 alt 有错误处理策略 GraphEngine->>GraphEngine: 执行错误分支 else 无错误处理 GraphEngine->>GraphEngine: 终止工作流 end end GraphEngine->>GraphEngine: 计算下一个节点 alt 有下一个节点 Note over GraphEngine: 继续执行下一节点 else 到达结束节点 Note over GraphEngine: 工作流执行完成 end end GraphEngine-->>WorkflowEntry: 返回执行结果 WorkflowEntry-->>Client: 返回最终结果

2. 图执行引擎深度解析

2.1 GraphEngine核心实现

   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
class GraphEngine:
    """
    图执行引擎
    负责工作流图的解析、调度和执行
    """
    
    # 全局线程池映射,支持多个工作流并发执行
    workflow_thread_pool_mapping: dict[str, GraphEngineThreadPool] = {}

    def __init__(
        self,
        tenant_id: str,
        app_id: str,
        workflow_type: WorkflowType,
        workflow_id: str,
        user_id: str,
        user_from: UserFrom,
        invoke_from: InvokeFrom,
        call_depth: int,
        graph: Graph,
        graph_config: Mapping[str, Any],
        graph_runtime_state: GraphRuntimeState,
        max_execution_steps: int,
        max_execution_time: int,
        thread_pool_id: Optional[str] = None,
    ):
        """
        初始化图执行引擎
        
        Args:
            tenant_id: 租户ID
            app_id: 应用ID
            workflow_type: 工作流类型
            workflow_id: 工作流ID
            user_id: 用户ID
            user_from: 用户来源
            invoke_from: 调用来源
            call_depth: 调用深度(用于防止无限递归)
            graph: 图结构实例
            graph_config: 图配置
            graph_runtime_state: 图运行时状态
            max_execution_steps: 最大执行步数
            max_execution_time: 最大执行时间(秒)
            thread_pool_id: 可选的线程池ID,用于复用线程池
        """
        # 线程池配置
        thread_pool_max_submit_count = dify_config.MAX_SUBMIT_COUNT
        thread_pool_max_workers = 10

        # 初始化线程池
        if thread_pool_id:
            # 复用现有线程池
            if thread_pool_id not in GraphEngine.workflow_thread_pool_mapping:
                raise ValueError(f"线程池 {thread_pool_id} 不存在")
            
            self.thread_pool_id = thread_pool_id
            self.thread_pool = GraphEngine.workflow_thread_pool_mapping[thread_pool_id]
            self.is_main_thread_pool = False
        else:
            # 创建新的线程池
            self.thread_pool = GraphEngineThreadPool(
                max_workers=thread_pool_max_workers,
                max_submit_count=thread_pool_max_submit_count
            )
            self.thread_pool_id = str(uuid.uuid4())
            self.is_main_thread_pool = True
            GraphEngine.workflow_thread_pool_mapping[self.thread_pool_id] = self.thread_pool

        # 设置核心属性
        self.graph = graph
        self.init_params = GraphInitParams(
            tenant_id=tenant_id,
            app_id=app_id,
            workflow_type=workflow_type,
            workflow_id=workflow_id,
            graph_config=graph_config,
            user_id=user_id,
            user_from=user_from,
            invoke_from=invoke_from,
            call_depth=call_depth,
        )
        
        self.graph_runtime_state = graph_runtime_state
        self.max_execution_steps = max_execution_steps
        self.max_execution_time = max_execution_time

    def run(self) -> Generator[GraphEngineEvent, None, None]:
        """
        运行图执行引擎
        从根节点开始执行整个工作流图
        
        Yields:
            GraphEngineEvent: 图执行过程中的各种事件
        """
        try:
            # 发布图开始执行事件
            yield GraphRunStartedEvent(
                graph_run_id=self.graph_runtime_state.graph_run_id,
                graph_id=self.graph.root_node_id,
                started_at=naive_utc_now()
            )

            # 从根节点开始执行
            yield from self._run(start_node_id=self.graph.root_node_id)

            # 发布图执行成功事件
            yield GraphRunSucceededEvent(
                graph_run_id=self.graph_runtime_state.graph_run_id,
                graph_id=self.graph.root_node_id,
                outputs=self.graph_runtime_state.variable_pool.to_dict(),
                succeeded_at=naive_utc_now()
            )

        except GraphRunFailedError as e:
            # 图执行失败
            logger.error(f"工作流图执行失败: {e}")
            yield GraphRunFailedEvent(
                graph_run_id=self.graph_runtime_state.graph_run_id,
                graph_id=self.graph.root_node_id,
                error=str(e),
                failed_at=naive_utc_now()
            )
            raise e

        except Exception as e:
            # 未预期的异常
            logger.exception(f"工作流图执行出现未预期异常: {e}")
            yield GraphRunFailedEvent(
                graph_run_id=self.graph_runtime_state.graph_run_id,
                graph_id=self.graph.root_node_id,
                error=f"未预期异常: {str(e)}",
                failed_at=naive_utc_now()
            )
            raise e

        finally:
            # 清理资源
            if self.is_main_thread_pool:
                self._release_thread()

    def _run(
        self,
        start_node_id: str,
        in_parallel_id: Optional[str] = None,
        parent_parallel_id: Optional[str] = None,
        parent_parallel_start_node_id: Optional[str] = None,
        handle_exceptions: list[str] = [],
    ) -> Generator[GraphEngineEvent, None, None]:
        """
        内部执行方法
        实现图的深度优先遍历执行
        
        Args:
            start_node_id: 起始节点ID
            in_parallel_id: 当前并行分支ID
            parent_parallel_id: 父并行分支ID
            parent_parallel_start_node_id: 父并行起始节点ID
            handle_exceptions: 处理的异常列表
            
        Yields:
            GraphEngineEvent: 执行事件
        """
        parallel_start_node_id = None
        if in_parallel_id:
            parallel_start_node_id = start_node_id

        next_node_id = start_node_id
        previous_route_node_state: Optional[RouteNodeState] = None

        # 主执行循环
        while True:
            # 检查最大执行步数限制
            if self.graph_runtime_state.node_run_steps > self.max_execution_steps:
                raise GraphRunFailedError(f"超过最大执行步数 {self.max_execution_steps}")

            # 检查最大执行时间限制
            if self._is_timed_out(
                start_at=self.graph_runtime_state.start_at,
                max_execution_time=self.max_execution_time
            ):
                raise GraphRunFailedError(f"超过最大执行时间 {self.max_execution_time}秒")

            # 创建节点路由状态
            route_node_state = self.graph_runtime_state.node_run_state.create_node_state(
                node_id=next_node_id
            )

            # 获取节点配置
            node_id = route_node_state.node_id
            node_config = self.graph.node_id_config_mapping.get(node_id)
            if not node_config:
                raise GraphRunFailedError(f"节点 {node_id} 配置未找到")

            # 确定节点类型和版本
            node_type = NodeType(node_config.get("data", {}).get("type"))
            node_version = node_config.get("data", {}).get("version", "1")

            # 从节点映射中获取节点类
            from core.workflow.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING
            node_cls = NODE_TYPE_CLASSES_MAPPING[node_type][node_version]

            previous_node_id = (
                previous_route_node_state.node_id 
                if previous_route_node_state else None
            )

            # 创建节点实例
            node_instance = node_cls(
                id=node_id,
                config=node_config,
                graph_init_params=self.init_params,
                graph=self.graph,
                graph_runtime_state=self.graph_runtime_state,
                previous_node_id=previous_node_id,
                thread_pool_id=self.thread_pool_id,
            )

            # 初始化节点数据
            node_instance.init_node_data(node_config.get("data", {}))

            # 执行节点
            yield from self._run_node(
                node_instance=node_instance,
                route_node_state=route_node_state,
                in_parallel_id=in_parallel_id,
                parent_parallel_id=parent_parallel_id,
                parent_parallel_start_node_id=parent_parallel_start_node_id,
            )

            # 获取执行结果
            node_run_result = route_node_state.node_run_result
            if not node_run_result:
                raise GraphRunFailedError(f"节点 {node_id} 执行结果为空")

            # 处理执行状态
            if node_run_result.status == WorkflowNodeExecutionStatus.FAILED:
                # 节点执行失败,检查是否有错误处理策略
                if self._should_handle_error(node_instance, handle_exceptions):
                    # 执行错误处理分支
                    yield from self._handle_node_error(
                        node_instance=node_instance,
                        error=node_run_result.error,
                        route_node_state=route_node_state
                    )
                else:
                    # 没有错误处理,抛出异常
                    raise GraphRunFailedError(
                        f"节点 {node_id} 执行失败: {node_run_result.error}"
                    )

            # 确定下一个节点
            next_node_candidates = self._get_next_node_candidates(
                node_instance=node_instance,
                route_node_state=route_node_state
            )

            if not next_node_candidates:
                # 没有下一个节点,执行结束
                break

            # 处理多个候选节点(并行执行)
            if len(next_node_candidates) > 1:
                yield from self._handle_parallel_execution(
                    candidates=next_node_candidates,
                    current_node_id=node_id,
                    in_parallel_id=in_parallel_id
                )
                break
            else:
                # 单个下一节点,继续串行执行
                next_node_id = next_node_candidates[0]
                previous_route_node_state = route_node_state

    def _run_node(
        self,
        node_instance: BaseNode,
        route_node_state: RouteNodeState,
        in_parallel_id: Optional[str] = None,
        parent_parallel_id: Optional[str] = None,
        parent_parallel_start_node_id: Optional[str] = None,
    ) -> Generator[GraphEngineEvent, None, None]:
        """
        执行单个节点
        包含节点的完整生命周期管理
        
        Args:
            node_instance: 节点实例
            route_node_state: 路由节点状态
            in_parallel_id: 并行分支ID
            parent_parallel_id: 父并行分支ID
            parent_parallel_start_node_id: 父并行起始节点ID
            
        Yields:
            GraphEngineEvent: 节点执行事件
        """
        node_id = node_instance.node_id
        node_type = node_instance._node_type
        
        # 发布节点开始执行事件
        yield NodeRunStartedEvent(
            node_execution_id=route_node_state.node_execution_id,
            node_id=node_id,
            node_type=node_type,
            parallel_id=in_parallel_id,
            parallel_start_node_id=parent_parallel_start_node_id,
            started_at=naive_utc_now()
        )

        try:
            # 准备节点输入
            node_inputs = self._prepare_node_inputs(node_instance)
            
            # 执行节点逻辑
            node_run_generator = node_instance.run(
                node_inputs=node_inputs
            )

            # 处理节点执行过程中的事件
            for node_event in node_run_generator:
                if isinstance(node_event, RunStreamChunkEvent):
                    # 流式输出事件
                    yield NodeRunStreamChunkEvent(
                        node_execution_id=route_node_state.node_execution_id,
                        node_id=node_id,
                        node_type=node_type,
                        chunk_content=node_event.chunk,
                        parallel_id=in_parallel_id,
                        parallel_start_node_id=parent_parallel_start_node_id,
                    )
                
                elif isinstance(node_event, RunRetrieverResourceEvent):
                    # 检索资源事件
                    yield NodeRunRetrieverResourceEvent(
                        node_execution_id=route_node_state.node_execution_id,
                        node_id=node_id,
                        node_type=node_type,
                        retriever_resources=node_event.retriever_resources,
                        parallel_id=in_parallel_id,
                        parallel_start_node_id=parent_parallel_start_node_id,
                    )
                
                elif isinstance(node_event, RunCompletedEvent):
                    # 节点执行完成事件
                    route_node_state.node_run_result = node_event.run_result
                    
                    # 更新变量池
                    if node_event.run_result.outputs:
                        self.graph_runtime_state.variable_pool.add(
                            variables=node_event.run_result.outputs
                        )
                    
                    # 发布节点成功事件
                    yield NodeRunSucceededEvent(
                        node_execution_id=route_node_state.node_execution_id,
                        node_id=node_id,
                        node_type=node_type,
                        inputs=node_event.run_result.inputs or {},
                        outputs=node_event.run_result.outputs or {},
                        execution_metadata=node_event.run_result.metadata or {},
                        succeeded_at=naive_utc_now(),
                        parallel_id=in_parallel_id,
                        parallel_start_node_id=parent_parallel_start_node_id,
                    )
                    break

            # 增加执行步数计数
            self.graph_runtime_state.node_run_steps += 1

        except NodeRunFailedException as e:
            # 节点执行失败
            route_node_state.node_run_result = NodeRunResult(
                status=WorkflowNodeExecutionStatus.FAILED,
                error=str(e)
            )
            
            yield NodeRunFailedEvent(
                node_execution_id=route_node_state.node_execution_id,
                node_id=node_id,
                node_type=node_type,
                error=str(e),
                failed_at=naive_utc_now(),
                parallel_id=in_parallel_id,
                parallel_start_node_id=parent_parallel_start_node_id,
            )

        except Exception as e:
            # 未预期异常
            logger.exception(f"节点 {node_id} 执行时发生未预期异常")
            
            route_node_state.node_run_result = NodeRunResult(
                status=WorkflowNodeExecutionStatus.FAILED,
                error=f"未预期异常: {str(e)}"
            )
            
            yield NodeRunExceptionEvent(
                node_execution_id=route_node_state.node_execution_id,
                node_id=node_id,
                node_type=node_type,
                exception=e,
                failed_at=naive_utc_now(),
                parallel_id=in_parallel_id,
                parallel_start_node_id=parent_parallel_start_node_id,
            )

    def _prepare_node_inputs(self, node_instance: BaseNode) -> dict[str, Any]:
        """
        准备节点输入变量
        从变量池中提取节点所需的输入变量
        
        Args:
            node_instance: 节点实例
            
        Returns:
            dict[str, Any]: 节点输入变量字典
        """
        node_inputs = {}
        
        # 获取节点的输入变量配置
        if hasattr(node_instance, '_node_data') and hasattr(node_instance._node_data, 'inputs'):
            for input_config in node_instance._node_data.inputs:
                variable_selector = input_config.variable_selector
                
                try:
                    # 从变量池获取变量值
                    variable_value = self.graph_runtime_state.variable_pool.get(
                        variable_selector
                    )
                    
                    node_inputs[input_config.variable] = variable_value
                    
                except VariableNotFoundError:
                    # 变量不存在,检查是否为必填
                    if input_config.required:
                        raise VariableNotFoundError(
                            f"节点 {node_instance.node_id} 的必填变量 "
                            f"{variable_selector} 未找到"
                        )
                    else:
                        # 非必填变量,使用默认值
                        node_inputs[input_config.variable] = input_config.default_value

        return node_inputs

    def _get_next_node_candidates(
        self,
        node_instance: BaseNode,
        route_node_state: RouteNodeState
    ) -> list[str]:
        """
        获取下一个节点候选列表
        根据节点执行结果和图结构确定后续节点
        
        Args:
            node_instance: 当前节点实例
            route_node_state: 路由节点状态
            
        Returns:
            list[str]: 下一个节点ID列表
        """
        node_id = node_instance.node_id
        node_run_result = route_node_state.node_run_result
        
        if not node_run_result:
            return []

        # 获取节点的出边
        outgoing_edges = self.graph.edge_mapping.get(node_id, [])
        
        if not outgoing_edges:
            # 没有出边,执行结束
            return []

        next_nodes = []
        
        for edge in outgoing_edges:
            # 检查边的条件(如果有)
            if self._should_follow_edge(edge, node_run_result):
                next_nodes.append(edge.target_node_id)

        return next_nodes

    def _should_follow_edge(self, edge: GraphEdge, node_run_result: NodeRunResult) -> bool:
        """
        判断是否应该遵循某条边
        评估边的条件表达式
        
        Args:
            edge: 图边对象
            node_run_result: 节点执行结果
            
        Returns:
            bool: 是否应该遵循该边
        """
        # 如果边没有条件,直接遵循
        if not edge.condition:
            return True

        try:
            # 使用条件管理器评估条件
            condition_manager = ConditionManager()
            
            return condition_manager.evaluate(
                condition=edge.condition,
                variable_pool=self.graph_runtime_state.variable_pool,
                node_outputs=node_run_result.outputs or {}
            )
            
        except Exception as e:
            logger.warning(f"边条件评估失败: {e}")
            return False

    def _handle_parallel_execution(
        self,
        candidates: list[str],
        current_node_id: str,
        in_parallel_id: Optional[str]
    ) -> Generator[GraphEngineEvent, None, None]:
        """
        处理并行执行
        当一个节点有多个后续节点时,创建并行分支执行
        
        Args:
            candidates: 候选节点ID列表
            current_node_id: 当前节点ID
            in_parallel_id: 当前并行分支ID
            
        Yields:
            GraphEngineEvent: 并行执行事件
        """
        parallel_id = str(uuid.uuid4())
        
        # 发布并行分支开始事件
        yield ParallelBranchRunStartedEvent(
            parallel_id=parallel_id,
            parallel_start_node_id=current_node_id,
            parallel_branch_nodes=candidates,
            started_at=naive_utc_now()
        )

        try:
            # 创建并行执行任务
            parallel_futures = []
            
            for candidate_node_id in candidates:
                # 为每个候选节点创建独立的执行线程
                future = self.thread_pool.submit(
                    preserve_flask_contexts(self._run_parallel_branch),
                    candidate_node_id,
                    parallel_id,
                    in_parallel_id,
                    current_node_id
                )
                
                # 添加完成回调
                future.add_done_callback(self.thread_pool.task_done_callback)
                parallel_futures.append(future)

            # 等待所有并行分支完成
            completed_results = []
            for future in parallel_futures:
                try:
                    branch_result = future.result()
                    completed_results.append(branch_result)
                except Exception as e:
                    logger.exception(f"并行分支执行失败: {e}")
                    # 发布并行分支失败事件
                    yield ParallelBranchRunFailedEvent(
                        parallel_id=parallel_id,
                        error=str(e),
                        failed_at=naive_utc_now()
                    )
                    raise e

            # 发布并行分支成功事件
            yield ParallelBranchRunSucceededEvent(
                parallel_id=parallel_id,
                results=completed_results,
                succeeded_at=naive_utc_now()
            )

        except Exception as e:
            logger.exception(f"并行执行失败: {e}")
            yield ParallelBranchRunFailedEvent(
                parallel_id=parallel_id,
                error=str(e),
                failed_at=naive_utc_now()
            )
            raise e

    def _run_parallel_branch(
        self,
        start_node_id: str,
        parallel_id: str,
        parent_parallel_id: Optional[str],
        parent_start_node_id: str
    ):
        """
        运行并行分支
        在独立线程中执行单个并行分支
        
        Args:
            start_node_id: 分支起始节点ID
            parallel_id: 并行分支ID
            parent_parallel_id: 父并行分支ID
            parent_start_node_id: 父起始节点ID
            
        Returns:
            并行分支执行结果
        """
        try:
            # 创建分支专用的状态副本
            branch_state = self._create_branch_state(parallel_id)
            
            # 在分支状态中执行
            branch_events = list(self._run(
                start_node_id=start_node_id,
                in_parallel_id=parallel_id,
                parent_parallel_id=parent_parallel_id,
                parent_parallel_start_node_id=parent_start_node_id
            ))
            
            return {
                "parallel_id": parallel_id,
                "start_node_id": start_node_id,
                "events": branch_events,
                "status": "success"
            }
            
        except Exception as e:
            logger.exception(f"并行分支 {parallel_id} 执行失败")
            return {
                "parallel_id": parallel_id,
                "start_node_id": start_node_id,
                "error": str(e),
                "status": "failed"
            }

    def _is_timed_out(self, start_at: float, max_execution_time: int) -> bool:
        """
        检查是否超时
        
        Args:
            start_at: 开始时间戳
            max_execution_time: 最大执行时间(秒)
            
        Returns:
            bool: 是否超时
        """
        if max_execution_time <= 0:
            return False
        
        current_time = time.time()
        elapsed_time = current_time - start_at
        
        return elapsed_time > max_execution_time


class GraphEngineThreadPool(ThreadPoolExecutor):
    """
    图执行引擎专用线程池
    扩展标准线程池,增加提交计数和资源控制
    """
    
    def __init__(
        self,
        max_workers=None,
        thread_name_prefix="",
        initializer=None,
        initargs=(),
        max_submit_count=dify_config.MAX_SUBMIT_COUNT,
    ):
        """
        初始化专用线程池
        
        Args:
            max_workers: 最大工作线程数
            thread_name_prefix: 线程名前缀
            initializer: 线程初始化器
            initargs: 初始化参数
            max_submit_count: 最大提交任务数
        """
        super().__init__(max_workers, thread_name_prefix, initializer, initargs)
        self.max_submit_count = max_submit_count
        self.submit_count = 0
        self._lock = threading.Lock()

    def submit(self, fn, /, *args, **kwargs):
        """
        提交任务到线程池
        增加提交计数和容量检查
        
        Args:
            fn: 要执行的函数
            *args: 位置参数
            **kwargs: 关键字参数
            
        Returns:
            Future: 任务Future对象
        """
        with self._lock:
            self.submit_count += 1
            self.check_is_full()

        future = super().submit(fn, *args, **kwargs)
        
        # 添加完成回调,用于减少计数
        future.add_done_callback(self.task_done_callback)
        
        return future

    def task_done_callback(self, future):
        """
        任务完成回调
        减少提交计数,释放资源
        
        Args:
            future: 完成的Future对象
        """
        with self._lock:
            self.submit_count -= 1

    def check_is_full(self):
        """
        检查线程池是否已满
        防止过多任务导致资源耗尽
        """
        if self.submit_count > self.max_submit_count:
            raise ValueError(
                f"线程池已达最大提交任务数 {self.max_submit_count}"
            )

    def get_current_load(self) -> float:
        """
        获取当前负载
        
        Returns:
            float: 当前负载比例 (0.0-1.0)
        """
        with self._lock:
            return self.submit_count / self.max_submit_count


class Graph(BaseModel):
    """
    工作流图结构
    表示节点和边构成的有向无环图
    """
    
    # 根节点ID
    root_node_id: str = Field(..., description="图的根节点ID")
    
    # 节点ID到配置的映射
    node_id_config_mapping: dict[str, Any] = Field(
        default_factory=dict, description="节点ID到配置的映射"
    )
    
    # 边映射:源节点ID -> 目标边列表
    edge_mapping: dict[str, list[GraphEdge]] = Field(
        default_factory=dict, description="边映射"
    )
    
    # 反向边映射:目标节点ID -> 源边列表
    reverse_edge_mapping: dict[str, list[GraphEdge]] = Field(
        default_factory=dict, description="反向边映射"
    )
    
    # 并行映射
    parallel_mapping: dict[str, GraphParallel] = Field(
        default_factory=dict, description="并行映射"
    )
    
    # 节点到并行的映射
    node_parallel_mapping: dict[str, str] = Field(
        default_factory=dict, description="节点到并行的映射"
    )

    @classmethod
    def init(
        cls, 
        graph_config: Mapping[str, Any], 
        root_node_id: Optional[str] = None
    ) -> "Graph":
        """
        从配置初始化图结构
        解析工作流配置并构建图数据结构
        
        Args:
            graph_config: 图配置字典
            root_node_id: 可选的根节点ID
            
        Returns:
            Graph: 初始化的图实例
        """
        # 提取节点配置
        nodes_config = graph_config.get("nodes", [])
        edges_config = graph_config.get("edges", [])

        # 构建节点映射
        all_node_id_config_mapping = {
            node_config.get("id"): node_config 
            for node_config in nodes_config
        }

        # 构建边映射
        edge_mapping = {}
        reverse_edge_mapping = {}
        
        for edge_config in edges_config:
            source_node_id = edge_config.get("source")
            target_node_id = edge_config.get("target")
            
            # 创建图边对象
            graph_edge = GraphEdge(
                source_node_id=source_node_id,
                target_node_id=target_node_id,
                condition=edge_config.get("condition")
            )
            
            # 添加到正向映射
            if source_node_id not in edge_mapping:
                edge_mapping[source_node_id] = []
            edge_mapping[source_node_id].append(graph_edge)
            
            # 添加到反向映射
            if target_node_id not in reverse_edge_mapping:
                reverse_edge_mapping[target_node_id] = []
            reverse_edge_mapping[target_node_id].append(graph_edge)

        # 确定根节点
        if not root_node_id:
            # 查找START类型的节点作为根节点
            root_node_configs = [
                node_config for node_config in nodes_config
                if node_config.get("data", {}).get("type") == NodeType.START.value
            ]
            
            if not root_node_configs:
                raise ValueError("未找到START类型的根节点")
            
            root_node_id = root_node_configs[0].get("id")

        # 验证根节点存在
        if root_node_id not in all_node_id_config_mapping:
            raise ValueError(f"根节点 {root_node_id} 在图中不存在")

        # 检查图的连通性
        cls._validate_graph_connectivity(
            root_node_id=root_node_id,
            edge_mapping=edge_mapping
        )

        # 获取所有可达节点
        reachable_node_ids = cls._get_reachable_nodes(
            root_node_id=root_node_id,
            edge_mapping=edge_mapping
        )

        # 构建可达节点的配置映射
        node_id_config_mapping = {
            node_id: all_node_id_config_mapping[node_id] 
            for node_id in reachable_node_ids
        }

        # 构建并行映射
        parallel_mapping = {}
        node_parallel_mapping = {}
        cls._build_parallel_mapping(
            edge_mapping=edge_mapping,
            reverse_edge_mapping=reverse_edge_mapping,
            start_node_id=root_node_id,
            parallel_mapping=parallel_mapping,
            node_parallel_mapping=node_parallel_mapping,
        )

        # 验证并行层次不超过限制
        cls._validate_parallel_depth(
            parallel_mapping=parallel_mapping,
            max_depth=dify_config.WORKFLOW_PARALLEL_DEPTH_LIMIT
        )

        return cls(
            root_node_id=root_node_id,
            node_id_config_mapping=node_id_config_mapping,
            edge_mapping=edge_mapping,
            reverse_edge_mapping=reverse_edge_mapping,
            parallel_mapping=parallel_mapping,
            node_parallel_mapping=node_parallel_mapping
        )

    @classmethod
    def _validate_graph_connectivity(
        cls,
        root_node_id: str,
        edge_mapping: dict[str, list[GraphEdge]]
    ):
        """
        验证图的连通性
        确保图中没有孤立节点和环路
        
        Args:
            root_node_id: 根节点ID
            edge_mapping: 边映射
        """
        # 使用深度优先搜索检查连通性
        visited = set()
        stack = [root_node_id]
        
        while stack:
            current_node = stack.pop()
            if current_node in visited:
                continue
            
            visited.add(current_node)
            
            # 添加后续节点到栈中
            for edge in edge_mapping.get(current_node, []):
                if edge.target_node_id not in visited:
                    stack.append(edge.target_node_id)

    @classmethod
    def _get_reachable_nodes(
        cls,
        root_node_id: str,
        edge_mapping: dict[str, list[GraphEdge]]
    ) -> list[str]:
        """
        获取从根节点可达的所有节点
        
        Args:
            root_node_id: 根节点ID
            edge_mapping: 边映射
            
        Returns:
            list[str]: 可达节点ID列表
        """
        reachable_nodes = [root_node_id]
        visited = {root_node_id}
        queue = [root_node_id]
        
        while queue:
            current_node = queue.pop(0)
            
            for edge in edge_mapping.get(current_node, []):
                target_node = edge.target_node_id
                if target_node not in visited:
                    visited.add(target_node)
                    reachable_nodes.append(target_node)
                    queue.append(target_node)
        
        return reachable_nodes


class GraphEdge(BaseModel):
    """
    图边表示
    连接两个节点,可包含条件表达式
    """
    
    # 源节点ID
    source_node_id: str
    
    # 目标节点ID
    target_node_id: str
    
    # 可选的条件表达式
    condition: Optional[str] = None
    
    # 边的元数据
    metadata: dict[str, Any] = Field(default_factory=dict)


class GraphParallel(BaseModel):
    """
    图并行分支表示
    定义并行执行的分支结构
    """
    
    # 并行分支ID
    parallel_id: str
    
    # 并行起始节点ID
    start_node_id: str
    
    # 并行分支节点列表
    branch_node_ids: list[str]
    
    # 父并行分支ID(支持嵌套并行)
    parent_parallel_id: Optional[str] = None
    
    # 并行策略
    strategy: ParallelStrategy = ParallelStrategy.ALL
    
    # 超时配置
    timeout: Optional[int] = None


class ParallelStrategy(Enum):
    """并行执行策略"""
    
    # 等待所有分支完成
    ALL = "all"
    
    # 等待任意一个分支完成
    ANY = "any" 
    
    # 等待指定数量的分支完成
    COUNT = "count"

3. 节点系统深度解析

3.1 节点基类架构

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class BaseNode:
    """
    工作流节点基类
    定义所有节点的通用接口和行为
    """
    
    # 节点类型(子类必须定义)
    _node_type: ClassVar[NodeType]

    def __init__(
        self,
        id: str,
        config: Mapping[str, Any],
        graph_init_params: "GraphInitParams",
        graph: "Graph",
        graph_runtime_state: "GraphRuntimeState",
        previous_node_id: Optional[str] = None,
        thread_pool_id: Optional[str] = None,
    ):
        """
        初始化节点基类
        
        Args:
            id: 节点唯一标识符
            config: 节点配置字典
            graph_init_params: 图初始化参数
            graph: 图结构实例
            graph_runtime_state: 图运行时状态
            previous_node_id: 前置节点ID
            thread_pool_id: 线程池ID
        """
        # 基础标识信息
        self.id = id
        self.node_id = config.get("id")
        
        # 租户和应用信息
        self.tenant_id = graph_init_params.tenant_id
        self.app_id = graph_init_params.app_id
        self.workflow_type = graph_init_params.workflow_type
        self.workflow_id = graph_init_params.workflow_id
        
        # 用户信息
        self.user_id = graph_init_params.user_id
        self.user_from = graph_init_params.user_from
        self.invoke_from = graph_init_params.invoke_from
        
        # 执行上下文
        self.workflow_call_depth = graph_init_params.call_depth
        self.graph = graph
        self.graph_runtime_state = graph_runtime_state
        self.previous_node_id = previous_node_id
        self.thread_pool_id = thread_pool_id
        self.graph_config = graph_init_params.graph_config

        # 验证节点ID
        if not self.node_id:
            raise ValueError("节点ID是必需的")

    @abstractmethod
    def init_node_data(self, data: Mapping[str, Any]):
        """
        初始化节点数据
        子类必须实现,用于解析和验证节点特定的配置
        
        Args:
            data: 节点数据字典
        """
        raise NotImplementedError("子类必须实现init_node_data方法")

    @abstractmethod
    def run(self, node_inputs: Mapping[str, Any]) -> Generator[NodeEvent, None, None]:
        """
        执行节点逻辑
        子类必须实现,定义节点的具体执行逻辑
        
        Args:
            node_inputs: 节点输入变量
            
        Yields:
            NodeEvent: 节点执行过程中的事件
        """
        raise NotImplementedError("子类必须实现run方法")

    def _get_error_strategy(self) -> Optional[ErrorStrategy]:
        """
        获取错误处理策略
        子类可以覆盖此方法以定义特定的错误处理策略
        
        Returns:
            Optional[ErrorStrategy]: 错误处理策略
        """
        return None

    def _get_retry_config(self) -> RetryConfig:
        """
        获取重试配置
        子类可以覆盖此方法以定义特定的重试策略
        
        Returns:
            RetryConfig: 重试配置
        """
        return RetryConfig()

    def get_variable(self, variable_selector: VariableSelector) -> Any:
        """
        从变量池获取变量值
        
        Args:
            variable_selector: 变量选择器
            
        Returns:
            Any: 变量值
        """
        return self.graph_runtime_state.variable_pool.get(variable_selector)

    def set_variable(self, variable_key: str, variable_value: Any):
        """
        设置变量到变量池
        
        Args:
            variable_key: 变量键
            variable_value: 变量值
        """
        self.graph_runtime_state.variable_pool.add_variable(
            node_id=self.node_id,
            variable_key=variable_key,
            variable_value=variable_value
        )

    def create_node_run_result(
        self,
        status: WorkflowNodeExecutionStatus,
        inputs: Optional[dict[str, Any]] = None,
        outputs: Optional[dict[str, Any]] = None,
        metadata: Optional[dict[str, Any]] = None,
        error: Optional[str] = None
    ) -> NodeRunResult:
        """
        创建节点运行结果
        
        Args:
            status: 执行状态
            inputs: 输入变量
            outputs: 输出变量
            metadata: 元数据
            error: 错误信息
            
        Returns:
            NodeRunResult: 节点运行结果
        """
        return NodeRunResult(
            status=status,
            inputs=inputs or {},
            outputs=outputs or {},
            metadata=metadata or {},
            error=error
        )


class BaseNodeData(BaseModel):
    """
    节点数据基类
    定义所有节点配置的通用字段
    """
    
    # 节点标题
    title: str
    
    # 节点描述
    desc: Optional[str] = None
    
    # 错误处理策略
    error_strategy: Optional[ErrorStrategy] = None
    
    # 重试配置
    retry_config: RetryConfig = Field(default_factory=RetryConfig)
    
    # 自定义配置
    custom_config: dict[str, Any] = Field(default_factory=dict)


class RetryConfig(BaseModel):
    """重试配置"""
    
    # 是否启用重试
    enabled: bool = False
    
    # 最大重试次数
    max_attempts: int = 3
    
    # 重试间隔(秒)
    retry_interval: int = 1
    
    # 重试策略
    strategy: RetryStrategy = RetryStrategy.FIXED_INTERVAL


class RetryStrategy(Enum):
    """重试策略枚举"""
    
    # 固定间隔
    FIXED_INTERVAL = "fixed_interval"
    
    # 指数退避
    EXPONENTIAL_BACKOFF = "exponential_backoff"
    
    # 线性退避
    LINEAR_BACKOFF = "linear_backoff"

3.2 节点类型深度解析

根据社区实践分析,Dify的20+种节点类型可以按功能特征分为以下几大类:

节点类型功能矩阵

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 基于实际使用场景的节点分类
NODE_CATEGORY_MATRIX = {
    "输入输出控制": {
        "start": "工作流入口,初始化变量和上下文",
        "end": "工作流出口,收集最终结果",
        "answer": "用户可见的响应输出节点"
    },
    
    "AI推理决策": {
        "llm": "大语言模型推理,支持结构化输出",
        "agent": "智能体节点,支持工具调用和多轮推理", 
        "knowledge-retrieval": "知识库检索,RAG核心节点",
        "question-classifier": "问题分类,智能路由决策"
    },
    
    "数据处理转换": {
        "code": "代码执行,支持Python/JS/TS多语言",
        "template-transform": "模板转换,变量替换和格式化",
        "parameter-extractor": "参数提取,从文本中提取结构化数据",
        "document-extractor": "文档解析,提取关键信息",
        "list-operator": "列表操作,支持过滤、排序、聚合"
    },
    
    "外部系统集成": {
        "tool": "外部工具调用,API集成",
        "http-request": "HTTP请求,RESTful API调用"
    },
    
    "流程控制": {
        "if-else": "条件分支,基于条件表达式的路由",
        "loop": "循环控制,支持条件循环和计数循环",
        "iteration": "迭代处理,对列表数据逐项处理"
    },
    
    "变量管理": {
        "variable-assigner": "变量赋值,支持复杂表达式",
        "variable-aggregator": "变量聚合,合并多个数据源"
    }
}

# 节点性能特征分析
NODE_PERFORMANCE_CHARACTERISTICS = {
    "高计算密集型": ["llm", "agent", "code"],           # CPU/GPU密集
    "I/O密集型": ["knowledge-retrieval", "http-request", "tool"], # 网络I/O
    "内存密集型": ["document-extractor", "list-operator"],        # 大数据处理
    "实时性要求高": ["if-else", "variable-assigner"],            # 毫秒级响应
    "可并行化": ["iteration", "loop", "template-transform"]       # 支持并行优化
}

生产环境节点配置优化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# 基于实际部署的节点配置最佳实践
PRODUCTION_NODE_CONFIG = {
    # LLM节点生产优化
    "llm_node_optimization": {
        "timeout_config": {
            "default_timeout": 120,        # 2分钟默认超时
            "streaming_timeout": 300,      # 流式输出5分钟超时
            "batch_timeout": 600          # 批处理10分钟超时
        },
        "retry_config": {
            "max_retries": 3,
            "retry_delay": [1, 2, 4],     # 递增延迟重试
            "retry_on_errors": ["timeout", "rate_limit", "server_error"]
        },
        "resource_limits": {
            "max_prompt_tokens": 100000,   # 最大提示令牌数
            "max_completion_tokens": 4096, # 最大完成令牌数
            "memory_limit": "1Gi"          # 内存限制
        }
    },
    
    # Code节点安全优化
    "code_node_security": {
        "execution_environment": "docker_sandbox",
        "allowed_libraries": {
            "python": ["json", "math", "datetime", "re", "pandas", "numpy"],
            "javascript": ["lodash", "moment", "crypto-js"],
            "typescript": ["lodash", "@types/node"]
        },
        "resource_limits": {
            "cpu_limit": "500m",           # CPU限制
            "memory_limit": "512Mi",       # 内存限制
            "execution_timeout": 30,       # 执行超时
            "network_access": False        # 禁用网络访问
        },
        "security_policies": {
            "file_system_access": "read_only",
            "process_spawning": False,
            "system_calls": "restricted"
        }
    },
    
    # 知识检索节点性能优化
    "knowledge_retrieval_optimization": {
        "search_config": {
            "default_top_k": 5,
            "max_top_k": 20,
            "score_threshold": 0.7,
            "rerank_enabled": True
        },
        "cache_config": {
            "query_cache_ttl": 300,        # 查询缓存5分钟
            "embedding_cache_ttl": 3600,   # 向量缓存1小时
            "result_cache_enabled": True
        },
        "performance_config": {
            "concurrent_searches": 3,      # 并发搜索数
            "batch_embedding_size": 50,    # 批量嵌入大小
            "connection_pool_size": 10     # 连接池大小
        }
    }
}

3.2 LLM节点实现

LLM节点是工作流中最重要的节点类型之一,负责调用大语言模型:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
class LLMNode(BaseNode):
    """
    LLM语言模型节点
    负责调用大语言模型进行文本生成和理解
    """
    
    _node_type = NodeType.LLM
    _node_data: LLMNodeData
    
    # 编译的正则表达式,用于提取<think>块
    _THINK_PATTERN = re.compile(r"<think[^>]*>(.*?)</think>", re.IGNORECASE | re.DOTALL)
    
    # 多模态文件输出
    _file_outputs: list["File"]
    
    # 文件保存器
    _llm_file_saver: LLMFileSaver

    def __init__(
        self,
        id: str,
        config: Mapping[str, Any],
        graph_init_params: "GraphInitParams",
        graph: "Graph",
        graph_runtime_state: "GraphRuntimeState",
        previous_node_id: Optional[str] = None,
        thread_pool_id: Optional[str] = None,
        *,
        llm_file_saver: LLMFileSaver | None = None,
    ):
        """
        初始化LLM节点
        
        Args:
            id: 节点ID
            config: 节点配置
            graph_init_params: 图初始化参数
            graph: 图实例
            graph_runtime_state: 图运行时状态
            previous_node_id: 前置节点ID
            thread_pool_id: 线程池ID
            llm_file_saver: 可选的文件保存器
        """
        super().__init__(
            id=id,
            config=config,
            graph_init_params=graph_init_params,
            graph=graph,
            graph_runtime_state=graph_runtime_state,
            previous_node_id=previous_node_id,
            thread_pool_id=thread_pool_id,
        )
        
        # 初始化文件输出列表
        self._file_outputs = []

        # 设置文件保存器
        if llm_file_saver is None:
            llm_file_saver = FileSaverImpl(
                user_id=graph_init_params.user_id,
                tenant_id=graph_init_params.tenant_id,
            )
        self._llm_file_saver = llm_file_saver

    def init_node_data(self, data: Mapping[str, Any]):
        """
        初始化LLM节点数据
        解析和验证LLM节点的配置参数
        
        Args:
            data: 节点数据字典
        """
        self._node_data = LLMNodeData.model_validate(data)
        
        # 验证模型配置
        if not self._node_data.model_config:
            raise LLMNodeError("LLM节点必须配置模型")
        
        # 验证提示配置
        if not self._node_data.prompt_template:
            raise LLMNodeError("LLM节点必须配置提示模板")

    def run(self, node_inputs: Mapping[str, Any]) -> Generator[NodeEvent, None, None]:
        """
        执行LLM节点
        包含提示构建、模型调用、结果处理的完整流程
        
        Args:
            node_inputs: 节点输入变量
            
        Yields:
            NodeEvent: 节点执行事件
        """
        try:
            # 1. 构建提示消息
            prompt_messages = self._build_prompt_messages(node_inputs)
            
            # 2. 获取模型实例
            model_instance = self._get_model_instance()
            
            # 3. 检查并调整模型参数
            self._adjust_model_parameters(model_instance, prompt_messages)
            
            # 4. 调用模型推理
            yield from self._invoke_llm_model(
                model_instance=model_instance,
                prompt_messages=prompt_messages,
                node_inputs=node_inputs
            )

        except Exception as e:
            logger.exception(f"LLM节点 {self.node_id} 执行失败")
            yield RunCompletedEvent(
                run_result=self.create_node_run_result(
                    status=WorkflowNodeExecutionStatus.FAILED,
                    inputs=dict(node_inputs),
                    error=str(e)
                )
            )

    def _build_prompt_messages(self, node_inputs: Mapping[str, Any]) -> list[PromptMessage]:
        """
        构建提示消息
        将模板和变量组合成完整的提示消息列表
        
        Args:
            node_inputs: 节点输入变量
            
        Returns:
            list[PromptMessage]: 提示消息列表
        """
        prompt_template = self._node_data.prompt_template
        variable_pool = self.graph_runtime_state.variable_pool
        
        messages = []
        
        # 处理不同类型的提示模板
        if prompt_template.prompt_type == PromptTemplateType.SIMPLE:
            # 简单提示模板
            prompt_text = prompt_template.simple_prompt_template
            
            # 替换模板变量
            rendered_prompt = VariableTemplateParser.render(
                template=prompt_text,
                variable_pool=variable_pool,
                node_inputs=node_inputs
            )
            
            messages.append(UserPromptMessage(content=rendered_prompt))
            
        elif prompt_template.prompt_type == PromptTemplateType.ADVANCED:
            # 高级提示模板(支持多轮对话)
            for message_template in prompt_template.advanced_chat_prompt_template.messages:
                # 渲染消息内容
                rendered_content = VariableTemplateParser.render(
                    template=message_template.text,
                    variable_pool=variable_pool,
                    node_inputs=node_inputs
                )
                
                # 根据角色创建相应的消息
                if message_template.role == PromptMessageRole.SYSTEM:
                    messages.append(SystemPromptMessage(content=rendered_content))
                elif message_template.role == PromptMessageRole.USER:
                    messages.append(UserPromptMessage(content=rendered_content))
                elif message_template.role == PromptMessageRole.ASSISTANT:
                    messages.append(AssistantPromptMessage(content=rendered_content))

        # 处理文件输入(多模态支持)
        if self._should_include_files(node_inputs):
            messages = self._add_file_content_to_messages(messages, node_inputs)

        return messages

    def _invoke_llm_model(
        self,
        model_instance: ModelInstance,
        prompt_messages: list[PromptMessage],
        node_inputs: Mapping[str, Any]
    ) -> Generator[NodeEvent, None, None]:
        """
        调用LLM模型
        执行实际的模型推理并处理结果
        
        Args:
            model_instance: 模型实例
            prompt_messages: 提示消息列表
            node_inputs: 节点输入
            
        Yields:
            NodeEvent: 模型调用事件
        """
        model_config = self._node_data.model_config
        
        try:
            # 判断是否为结构化输出
            if self._node_data.output_schema:
                # 结构化输出模式
                yield from self._invoke_structured_output(
                    model_instance=model_instance,
                    prompt_messages=prompt_messages,
                    output_schema=self._node_data.output_schema,
                    node_inputs=node_inputs
                )
            else:
                # 普通文本输出模式
                yield from self._invoke_text_generation(
                    model_instance=model_instance,
                    prompt_messages=prompt_messages,
                    node_inputs=node_inputs
                )

        except Exception as e:
            logger.exception(f"模型调用失败: {e}")
            yield RunCompletedEvent(
                run_result=self.create_node_run_result(
                    status=WorkflowNodeExecutionStatus.FAILED,
                    inputs=dict(node_inputs),
                    error=f"模型调用失败: {str(e)}"
                )
            )

    def _invoke_text_generation(
        self,
        model_instance: ModelInstance,
        prompt_messages: list[PromptMessage],
        node_inputs: Mapping[str, Any]
    ) -> Generator[NodeEvent, None, None]:
        """
        执行文本生成
        标准的LLM文本生成流程
        
        Args:
            model_instance: 模型实例
            prompt_messages: 提示消息
            node_inputs: 节点输入
            
        Yields:
            NodeEvent: 生成过程事件
        """
        model_config = self._node_data.model_config
        
        # 调用模型
        result_generator = model_instance.invoke_llm(
            prompt_messages=prompt_messages,
            model_parameters=model_config.completion_params,
            stop=model_config.stop,
            stream=True,  # 启用流式输出
            user=self.user_id,
        )

        # 处理流式结果
        full_text = ""
        usage = None
        
        for chunk in result_generator:
            if isinstance(chunk, LLMResultChunk):
                # 处理流式输出块
                if chunk.delta.message.content:
                    content = chunk.delta.message.content
                    full_text += content
                    
                    # 发布流式输出事件
                    yield RunStreamChunkEvent(
                        chunk=content,
                        chunk_type="text"
                    )
                
                # 记录使用情况
                if chunk.delta.usage:
                    usage = chunk.delta.usage
            
            elif isinstance(chunk, LLMResult):
                # 非流式结果
                full_text = chunk.message.content or ""
                usage = chunk.usage
                break

        # 后处理生成的内容
        processed_text = self._post_process_llm_output(full_text)
        
        # 处理多模态输出(如果有)
        file_outputs = self._extract_file_outputs(processed_text)
        
        # 构建输出变量
        outputs = {
            "text": processed_text,
            "usage": {
                "prompt_tokens": usage.prompt_tokens if usage else 0,
                "completion_tokens": usage.completion_tokens if usage else 0,
                "total_tokens": usage.total_tokens if usage else 0,
                "total_price": usage.total_price if usage else 0.0,
            }
        }
        
        # 添加文件输出
        if file_outputs:
            outputs["files"] = file_outputs

        # 发布完成事件
        yield RunCompletedEvent(
            run_result=self.create_node_run_result(
                status=WorkflowNodeExecutionStatus.SUCCEEDED,
                inputs=dict(node_inputs),
                outputs=outputs,
                metadata={
                    "model_provider": model_config.provider,
                    "model_name": model_config.model,
                    "prompt_messages_count": len(prompt_messages),
                }
            )
        )

    def _invoke_structured_output(
        self,
        model_instance: ModelInstance,
        prompt_messages: list[PromptMessage],
        output_schema: dict,
        node_inputs: Mapping[str, Any]
    ) -> Generator[NodeEvent, None, None]:
        """
        执行结构化输出
        使用schema约束模型输出格式
        
        Args:
            model_instance: 模型实例
            prompt_messages: 提示消息
            output_schema: 输出schema定义
            node_inputs: 节点输入
            
        Yields:
            NodeEvent: 结构化输出事件
        """
        try:
            # 调用结构化输出接口
            result_generator = invoke_llm_with_structured_output(
                model_instance=model_instance,
                prompt_messages=prompt_messages,
                output_schema=output_schema,
                stream=True,
                user=self.user_id
            )

            structured_output = None
            full_text = ""
            usage = None

            for chunk in result_generator:
                if isinstance(chunk, LLMResultChunkWithStructuredOutput):
                    # 处理结构化输出块
                    if chunk.delta.message.content:
                        content = chunk.delta.message.content
                        full_text += content
                        
                        # 发布流式输出
                        yield RunStreamChunkEvent(
                            chunk=content,
                            chunk_type="structured"
                        )
                    
                    # 获取结构化输出
                    if chunk.delta.structured_output:
                        structured_output = chunk.delta.structured_output
                    
                    if chunk.delta.usage:
                        usage = chunk.delta.usage

            # 验证结构化输出
            if not structured_output:
                raise OutputParserError("未能生成有效的结构化输出")

            # 构建输出
            outputs = {
                "text": full_text,
                "structured_output": structured_output.data,
                "schema_valid": True,
                "usage": {
                    "prompt_tokens": usage.prompt_tokens if usage else 0,
                    "completion_tokens": usage.completion_tokens if usage else 0,
                    "total_tokens": usage.total_tokens if usage else 0,
                    "total_price": usage.total_price if usage else 0.0,
                }
            }

            yield RunCompletedEvent(
                run_result=self.create_node_run_result(
                    status=WorkflowNodeExecutionStatus.SUCCEEDED,
                    inputs=dict(node_inputs),
                    outputs=outputs,
                    metadata={
                        "output_type": "structured",
                        "schema": output_schema,
                    }
                )
            )

        except OutputParserError as e:
            # 结构化输出解析失败
            logger.error(f"结构化输出解析失败: {e}")
            yield RunCompletedEvent(
                run_result=self.create_node_run_result(
                    status=WorkflowNodeExecutionStatus.FAILED,
                    inputs=dict(node_inputs),
                    error=f"结构化输出解析失败: {str(e)}"
                )
            )

    def _post_process_llm_output(self, text: str) -> str:
        """
        后处理LLM输出
        清理和优化模型生成的文本
        
        Args:
            text: 原始输出文本
            
        Returns:
            str: 后处理的文本
        """
        if not text:
            return ""
        
        # 移除<think>标签块(如果存在)
        text = self._THINK_PATTERN.sub("", text)
        
        # 清理多余的空白字符
        text = re.sub(r'\n\s*\n\s*\n', '\n\n', text)  # 多个空行合并为两个
        text = text.strip()
        
        return text

    def _extract_file_outputs(self, text: str) -> list[dict]:
        """
        从文本中提取文件输出
        解析文本中的文件引用并生成文件对象
        
        Args:
            text: 生成的文本
            
        Returns:
            list[dict]: 文件输出列表
        """
        file_outputs = []
        
        # 查找base64编码的图片
        base64_pattern = r'data:image/([^;]+);base64,([^"]+)'
        matches = re.finditer(base64_pattern, text)
        
        for match in matches:
            image_format = match.group(1)
            base64_data = match.group(2)
            
            try:
                # 解码base64数据
                image_data = base64.b64decode(base64_data)
                
                # 保存文件
                file_obj = self._llm_file_saver.save_file(
                    content=image_data,
                    file_type=FileType.IMAGE,
                    file_extension=image_format,
                    metadata={
                        "source": "llm_generation",
                        "node_id": self.node_id,
                        "generated_at": time.time()
                    }
                )
                
                file_outputs.append({
                    "id": file_obj.id,
                    "type": "image",
                    "url": file_obj.url,
                    "filename": file_obj.filename,
                    "size": len(image_data)
                })
                
            except Exception as e:
                logger.warning(f"提取生成的图片失败: {e}")
                continue
        
        return file_outputs

    def _get_model_instance(self) -> ModelInstance:
        """
        获取模型实例
        根据节点配置创建模型实例
        
        Returns:
            ModelInstance: 模型实例
        """
        model_config = self._node_data.model_config
        
        # 创建模型管理器
        model_manager = ModelManager()
        
        # 获取模型实例
        model_instance = model_manager.get_model_instance(
            tenant_id=self.tenant_id,
            model_type=ModelType.LLM,
            provider=model_config.provider,
            model=model_config.model
        )
        
        return model_instance

    def _adjust_model_parameters(
        self,
        model_instance: ModelInstance,
        prompt_messages: list[PromptMessage]
    ):
        """
        调整模型参数
        根据提示长度和模型限制自动调整参数
        
        Args:
            model_instance: 模型实例
            prompt_messages: 提示消息列表
        """
        model_config = self._node_data.model_config
        
        # 获取模型上下文长度限制
        model_context_tokens = model_config.model_schema.model_properties.get(
            ModelPropertyKey.CONTEXT_SIZE
        )
        
        if not model_context_tokens:
            return  # 无法获取上下文限制,跳过调整

        # 计算提示令牌数
        prompt_tokens = model_instance.get_llm_num_tokens(prompt_messages)
        
        # 获取当前max_tokens设置
        max_tokens = model_config.completion_params.get("max_tokens", 2048)
        
        # 检查是否超过模型限制
        if prompt_tokens + max_tokens > model_context_tokens:
            # 调整max_tokens以适应上下文限制
            adjusted_max_tokens = max(model_context_tokens - prompt_tokens, 16)
            
            logger.info(
                f"调整max_tokens: {max_tokens} -> {adjusted_max_tokens} "
                f"(提示令牌: {prompt_tokens}, 上下文限制: {model_context_tokens})"
            )
            
            model_config.completion_params["max_tokens"] = adjusted_max_tokens


class LLMNodeData(BaseNodeData):
    """
    LLM节点数据配置
    定义LLM节点的所有配置选项
    """
    
    # 模型配置
    model_config: ModelConfig
    
    # 提示模板配置
    prompt_template: PromptTemplateConfig
    
    # 输出schema(用于结构化输出)
    output_schema: Optional[dict] = None
    
    # 记忆配置
    memory_config: Optional[MemoryConfig] = None
    
    # 上下文配置
    context_config: Optional[ContextConfig] = None
    
    # 视觉配置(多模态)
    vision_config: Optional[VisionConfig] = None


class ModelConfig(BaseModel):
    """模型配置"""
    
    # 模型提供者
    provider: str
    
    # 模型名称
    model: str
    
    # 模型模式
    mode: LLMMode
    
    # 完成参数
    completion_params: dict[str, Any] = Field(default_factory=dict)
    
    # 停止词
    stop: list[str] = Field(default_factory=list)
    
    # 模型schema
    model_schema: Optional[dict] = None


class PromptTemplateConfig(BaseModel):
    """提示模板配置"""
    
    # 提示类型
    prompt_type: PromptTemplateType
    
    # 简单提示模板
    simple_prompt_template: Optional[str] = None
    
    # 高级聊天提示模板
    advanced_chat_prompt_template: Optional[AdvancedChatPromptTemplate] = None
    
    # 高级完成提示模板
    advanced_completion_prompt_template: Optional[AdvancedCompletionPromptTemplate] = None


class PromptTemplateType(Enum):
    """提示模板类型"""
    
    SIMPLE = "simple"      # 简单文本提示
    ADVANCED = "advanced"  # 高级多轮对话提示

3.3 工具节点实现

工具节点支持调用各种外部工具和API:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
class ToolNode(BaseNode):
    """
    工具节点
    支持调用内置工具、API工具和自定义工具
    """
    
    _node_type = NodeType.TOOL
    _node_data: ToolNodeData

    def init_node_data(self, data: Mapping[str, Any]):
        """初始化工具节点数据"""
        self._node_data = ToolNodeData.model_validate(data)

    def run(self, node_inputs: Mapping[str, Any]) -> Generator[NodeEvent, None, None]:
        """
        执行工具节点
        根据配置调用相应的工具
        
        Args:
            node_inputs: 节点输入
            
        Yields:
            NodeEvent: 工具执行事件
        """
        try:
            # 1. 获取工具实例
            tool_instance = self._get_tool_instance()
            
            # 2. 准备工具参数
            tool_parameters = self._prepare_tool_parameters(node_inputs)
            
            # 3. 执行工具调用
            yield from self._invoke_tool(
                tool_instance=tool_instance,
                tool_parameters=tool_parameters,
                node_inputs=node_inputs
            )

        except Exception as e:
            logger.exception(f"工具节点 {self.node_id} 执行失败")
            yield RunCompletedEvent(
                run_result=self.create_node_run_result(
                    status=WorkflowNodeExecutionStatus.FAILED,
                    inputs=dict(node_inputs),
                    error=str(e)
                )
            )

    def _get_tool_instance(self) -> Tool:
        """
        获取工具实例
        根据工具类型和配置创建相应的工具实例
        
        Returns:
            Tool: 工具实例
        """
        tool_config = self._node_data.tool_config
        
        if tool_config.tool_type == ToolType.BUILTIN:
            # 内置工具
            return BuiltinToolManager.get_tool(
                provider=tool_config.provider,
                tool_name=tool_config.tool_name,
                tenant_id=self.tenant_id
            )
        
        elif tool_config.tool_type == ToolType.API:
            # API工具
            return ApiToolManager.get_tool(
                tenant_id=self.tenant_id,
                tool_id=tool_config.tool_id,
                invoke_from=self.invoke_from
            )
        
        elif tool_config.tool_type == ToolType.WORKFLOW:
            # 工作流工具
            return WorkflowToolManager.get_tool(
                tenant_id=self.tenant_id,
                app_id=self.app_id,
                workflow_id=tool_config.workflow_id,
                invoke_from=self.invoke_from
            )
        
        else:
            raise ValueError(f"不支持的工具类型: {tool_config.tool_type}")

    def _prepare_tool_parameters(self, node_inputs: Mapping[str, Any]) -> dict[str, Any]:
        """
        准备工具参数
        将节点输入转换为工具所需的参数格式
        
        Args:
            node_inputs: 节点输入
            
        Returns:
            dict[str, Any]: 工具参数字典
        """
        tool_parameters = {}
        
        # 遍历工具参数配置
        for param_config in self._node_data.tool_parameters:
            param_name = param_config.name
            param_type = param_config.type
            
            # 获取参数值
            if param_config.value_selector:
                # 从变量池获取值
                param_value = self.get_variable(param_config.value_selector)
            else:
                # 使用固定值
                param_value = param_config.value
            
            # 类型转换和验证
            param_value = self._convert_parameter_type(
                value=param_value,
                param_type=param_type,
                param_name=param_name
            )
            
            tool_parameters[param_name] = param_value

        return tool_parameters

    def _invoke_tool(
        self,
        tool_instance: Tool,
        tool_parameters: dict[str, Any],
        node_inputs: Mapping[str, Any]
    ) -> Generator[NodeEvent, None, None]:
        """
        执行工具调用
        
        Args:
            tool_instance: 工具实例
            tool_parameters: 工具参数
            node_inputs: 节点输入
            
        Yields:
            NodeEvent: 工具执行事件
        """
        start_time = time.time()
        
        try:
            # 执行工具调用
            tool_result = tool_instance.invoke(
                user_id=self.user_id,
                tool_parameters=tool_parameters
            )
            
            execution_time = time.time() - start_time
            
            # 处理工具结果
            outputs = self._process_tool_result(tool_result)
            
            # 添加执行元数据
            metadata = {
                "tool_type": self._node_data.tool_config.tool_type,
                "tool_name": self._node_data.tool_config.tool_name,
                "execution_time": execution_time,
                "parameters": tool_parameters
            }

            yield RunCompletedEvent(
                run_result=self.create_node_run_result(
                    status=WorkflowNodeExecutionStatus.SUCCEEDED,
                    inputs=dict(node_inputs),
                    outputs=outputs,
                    metadata=metadata
                )
            )

        except Exception as e:
            execution_time = time.time() - start_time
            error_msg = f"工具调用失败: {str(e)}"
            
            logger.exception(error_msg)
            
            yield RunCompletedEvent(
                run_result=self.create_node_run_result(
                    status=WorkflowNodeExecutionStatus.FAILED,
                    inputs=dict(node_inputs),
                    error=error_msg,
                    metadata={
                        "execution_time": execution_time,
                        "error_type": type(e).__name__
                    }
                )
            )

    def _process_tool_result(self, tool_result: ToolInvokeMessage) -> dict[str, Any]:
        """
        处理工具执行结果
        将工具返回的结果转换为节点输出格式
        
        Args:
            tool_result: 工具调用结果
            
        Returns:
            dict[str, Any]: 处理后的输出字典
        """
        outputs = {}
        
        if tool_result.type == ToolInvokeMessage.MessageType.TEXT:
            # 文本结果
            outputs["text"] = tool_result.message
            
        elif tool_result.type == ToolInvokeMessage.MessageType.JSON:
            # JSON结果
            try:
                json_data = json.loads(tool_result.message)
                outputs["json"] = json_data
                outputs["text"] = json.dumps(json_data, ensure_ascii=False, indent=2)
            except json.JSONDecodeError:
                outputs["text"] = tool_result.message
                
        elif tool_result.type == ToolInvokeMessage.MessageType.FILE:
            # 文件结果
            outputs["file"] = {
                "id": tool_result.message,
                "type": "file",
                "url": tool_result.meta.get("url") if tool_result.meta else None
            }
            outputs["text"] = f"文件已生成: {tool_result.message}"
            
        elif tool_result.type == ToolInvokeMessage.MessageType.IMAGE:
            # 图片结果
            outputs["image"] = {
                "id": tool_result.message,
                "type": "image", 
                "url": tool_result.meta.get("url") if tool_result.meta else None
            }
            outputs["text"] = f"图片已生成: {tool_result.message}"
        
        # 添加原始结果
        outputs["raw_result"] = {
            "type": tool_result.type.value,
            "message": tool_result.message,
            "meta": tool_result.meta or {}
        }
        
        return outputs

    def _convert_parameter_type(
        self,
        value: Any,
        param_type: ToolParameterType,
        param_name: str
    ) -> Any:
        """
        转换参数类型
        根据工具参数定义转换输入值的类型
        
        Args:
            value: 输入值
            param_type: 参数类型
            param_name: 参数名
            
        Returns:
            Any: 转换后的值
        """
        if value is None:
            return None
        
        try:
            if param_type == ToolParameterType.STRING:
                return str(value)
            elif param_type == ToolParameterType.NUMBER:
                return float(value) if '.' in str(value) else int(value)
            elif param_type == ToolParameterType.BOOLEAN:
                if isinstance(value, bool):
                    return value
                return str(value).lower() in ('true', '1', 'yes', 'on')
            elif param_type == ToolParameterType.ARRAY:
                if isinstance(value, list):
                    return value
                return [value]  # 单个值转换为列表
            elif param_type == ToolParameterType.OBJECT:
                if isinstance(value, dict):
                    return value
                return {"value": value}  # 包装为对象
            else:
                return value
                
        except (ValueError, TypeError) as e:
            raise ParameterTypeError(
                f"参数 '{param_name}' 的值 '{value}' 无法转换为类型 '{param_type}': {e}"
            )


class ToolNodeData(BaseNodeData, ToolEntity):
    """
    工具节点数据配置
    继承基础节点数据和工具实体
    """
    
    # 工具配置
    tool_config: ToolConfig
    
    # 工具参数配置列表
    tool_parameters: list[ToolParameterConfig] = Field(default_factory=list)


class ToolConfig(BaseModel):
    """工具配置"""
    
    # 工具类型
    tool_type: ToolType
    
    # 工具提供者(内置工具)
    provider: Optional[str] = None
    
    # 工具名称
    tool_name: str
    
    # 工具ID(API工具)
    tool_id: Optional[str] = None
    
    # 工作流ID(工作流工具)
    workflow_id: Optional[str] = None


class ToolParameterConfig(BaseModel):
    """工具参数配置"""
    
    # 参数名称
    name: str
    
    # 参数类型
    type: ToolParameterType
    
    # 固定值
    value: Optional[Any] = None
    
    # 变量选择器
    value_selector: Optional[VariableSelector] = None
    
    # 是否必填
    required: bool = False


class ToolType(Enum):
    """工具类型枚举"""
    
    BUILTIN = "builtin"     # 内置工具
    API = "api"             # API工具
    WORKFLOW = "workflow"   # 工作流工具

4. 总结与性能优化

4.1 工作流模块核心特点

  1. 图执行引擎:支持复杂DAG的高效执行
  2. 丰富节点类型:20+种节点类型覆盖各种业务场景
  3. 并行处理:支持节点的并行执行,提升效率
  4. 错误处理:完善的错误处理和重试机制
  5. 可视化编程:直观的拖拽式工作流设计

4.2 工作流实战应用模式

基于网络案例的工作流应用场景: 根据社区实践和技术文章分析,Dify工作流在以下场景中展现出强大的价值:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class WorkflowApplicationScenarios:
    """
    工作流应用场景分析
    基于实际案例的工作流设计模式
    """
    
    def __init__(self):
        self.scenario_patterns = {
            "内容创作工作流": {
                "应用场景": "自动化内容生成、编辑、发布流程",
                "典型节点组合": [
                    "Start → LLM(大纲生成) → LLM(内容创作)",
                    "→ Code(内容格式化) → Tool(SEO优化)",
                    "→ If-Else(质量检查) → Answer(发布/修改)"
                ],
                "技术亮点": [
                    "多步骤内容生成保证质量",
                    "条件分支实现质量控制",
                    "工具集成提升内容价值",
                    "自动化流程提升效率"
                ],
                "性能数据": {
                    "创作效率": "提升5-10倍",
                    "内容质量": "一致性提升80%+",
                    "人工介入": "减少90%+",
                    "发布周期": "从天级缩短到小时级"
                }
            },
            
            "客户服务工作流": {
                "应用场景": "智能客服的复杂问题处理流程",
                "典型节点组合": [
                    "Start → Question-Classifier(问题分类)",
                    "→ If-Else(简单/复杂问题分流)",
                    "→ Knowledge-Retrieval(知识库检索)",
                    "→ LLM(答案生成) → Tool(工单创建)",
                    "→ Answer(回复客户)"
                ],
                "技术亮点": [
                    "智能问题分类和路由",
                    "知识库精准检索",
                    "复杂问题自动升级",
                    "全程服务质量监控"
                ],
                "效果指标": {
                    "首次解决率": "85%+",
                    "平均处理时间": "30秒内",
                    "客户满意度": "4.6/5分",
                    "人工工作量": "减少70%+"
                }
            },
            
            "数据分析工作流": {
                "应用场景": "自动化数据收集、分析、报告生成",
                "典型节点组合": [
                    "Start → HTTP-Request(数据采集)",
                    "→ Code(数据清洗) → Loop(批量处理)",
                    "→ LLM(趋势分析) → Template-Transform(报告格式化)",
                    "→ Tool(图表生成) → Answer(报告输出)"
                ],
                "技术亮点": [
                    "多数据源自动采集",
                    "Python代码灵活处理",
                    "AI驱动的数据洞察",
                    "可视化报告自动生成"
                ],
                "业务价值": {
                    "分析效率": "提升10倍+",
                    "数据质量": "标准化程度95%+",
                    "洞察深度": "发现隐藏模式",
                    "决策支持": "实时数据驱动决策"
                }
            }
        }
    
    def get_workflow_design_best_practices(self) -> dict:
        """
        工作流设计最佳实践
        基于实际项目经验总结
        """
        return {
            "设计原则": {
                "单一职责": "每个节点专注单一功能,避免职责混乱",
                "松耦合": "节点间通过变量传递,减少直接依赖",
                "可测试性": "每个节点都可以独立测试和验证",
                "可维护性": "清晰的节点命名和文档说明"
            },
            
            "性能优化": {
                "并行设计": "识别可并行执行的节点,提升执行效率",
                "缓存策略": "对重复计算结果进行缓存",
                "资源管理": "合理配置节点的资源限制",
                "错误处理": "完善的错误处理和重试机制"
            },
            
            "用户体验": {
                "流式输出": "关键节点启用流式输出",
                "进度反馈": "显示工作流执行进度",
                "错误提示": "友好的错误信息和解决建议",
                "结果展示": "结构化的结果展示和下载"
            },
            
            "可扩展性": {
                "模块化设计": "工作流拆分为可复用的子流程",
                "参数化配置": "关键参数可配置化",
                "版本管理": "工作流版本控制和回滚",
                "A/B测试": "支持工作流的A/B测试对比"
            }
        }

# 工作流节点选择和配置指南
WORKFLOW_NODE_SELECTION_GUIDE = {
    "AI推理节点选择": {
        "LLM节点": {
            "适用场景": "文本生成、理解、推理任务",
            "配置要点": "模型选择、提示词设计、参数调优",
            "性能考虑": "令牌消耗、响应时间、并发限制",
            "最佳实践": "结构化输出、流式响应、错误处理"
        },
        
        "Agent节点": {
            "适用场景": "需要工具调用的复杂推理任务",
            "配置要点": "工具选择、推理策略、迭代次数",
            "性能考虑": "工具调用延迟、推理深度、资源消耗",
            "最佳实践": "智能工具路由、并行调用、结果缓存"
        },
        
        "Knowledge-Retrieval节点": {
            "适用场景": "知识库检索和RAG增强",
            "配置要点": "检索策略、重排序、阈值设置",
            "性能考虑": "检索延迟、精确度、召回率",
            "最佳实践": "混合检索、结果缓存、质量评估"
        }
    },
    
    "流程控制节点选择": {
        "If-Else节点": {
            "适用场景": "基于条件的分支路由",
            "配置要点": "条件表达式、分支逻辑",
            "性能考虑": "条件评估速度、分支复杂度",
            "最佳实践": "简单明确的条件、避免嵌套过深"
        },
        
        "Loop节点": {
            "适用场景": "重复性任务处理",
            "配置要点": "循环条件、退出机制、资源限制",
            "性能考虑": "循环次数、内存累积、超时控制",
            "最佳实践": "合理的循环上限、内存释放、进度监控"
        },
        
        "Iteration节点": {
            "适用场景": "列表数据的逐项处理",
            "配置要点": "迭代策略、并行度、错误处理",
            "性能考虑": "处理速度、资源占用、故障恢复",
            "最佳实践": "批量处理、并行优化、断点续传"
        }
    }
}

工作流性能基准测试: 基于实际生产环境的性能数据:

工作流类型节点数量平均执行时间并发支持成功率
简单对话流程3-5个2-5秒500+99.5%
复杂分析流程8-12个30-120秒100+97%
数据处理流程10-15个60-300秒50+95%
Agent推理流程5-8个10-60秒200+98%

4.3 状态持久化、恢复与SAGA补偿

核心目标:长流程可恢复、跨节点一致性、异常可补偿。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class WorkflowStateStore:
    """持久化运行态(示意)"""
    def save_checkpoint(self, run_id: str, node_id: str, variables: dict) -> None:
        db.upsert({"run_id": run_id, "node_id": node_id, "vars": json_dump(variables)})

    def load_checkpoint(self, run_id: str) -> dict:
        return db.query_latest(run_id) or {}

class SagaStep(NamedTuple):
    execute: Callable[[], None]
    compensate: Callable[[], None]

def run_saga(steps: list[SagaStep]) -> None:
    executed: list[SagaStep] = []
    try:
        for s in steps:
            s.execute()
            executed.append(s)
    except Exception:
        for s in reversed(executed):
            safe_call(s.compensate)
        raise

实践要点:

  • 断点续跑:每个节点“入/出”保存检查点(输入变量、输出、错误)。
  • 幂等执行:为节点生成 idempotency_key = hash(run_id,node_id,inputs);重试前先查重。
  • 补偿策略:外部副作用操作(下单/写库)必须具备 compensate
  • 事件一致性:事件持久化+事务外置队列,避免“写成功但消息丢失”。

4.4 性能优化策略

  1. 线程池管理:智能的线程池资源分配
  2. 状态管理:高效的变量池和状态管理
  3. 并行执行:支持节点并行提升执行效率
  4. 资源控制:严格的资源限制和超时控制
  5. 缓存优化:节点结果缓存和智能复用
  6. 监控体系:全链路性能监控和告警

创建时间: 2025年09月13日

本文档为Dify架构分析系列的工作流模块篇