跳转至

嵌套工作流示例

TongAgents 提供了灵活的工作流构建能力,可以将多个节点组合成子流程,实现模块化数据处理。该示例展示了如何通过子流程先将输入数值格式化,再判断奇偶性,最后对结果进行汇总统计。

环境准备

  • 安装 TongAgents SDK,参考安装
  • 配置工作流服务,参考工作流
  • 无需额外 API 调用,所有逻辑均在本地完成

代码示例

examples/workflow/sub_workflow.py
import time
from collections.abc import Iterator
from typing import Any

from tongagents.workflow.nodes.simple_node_base import (
    END,
    START,
    InputProcessMode,
    NodeConfig,
)
from tongagents.workflow.simple_workflow import Workflow, WorkflowNode

if __name__ == "__main__":
    # ----------------- 构建子工作流 -----------------
    sub_wf = Workflow()

    @sub_wf.node(name="parse_value")
    def parse_value(data: Any) -> Any:
        """
        将输入数值转换为格式化字符串
        例如,输入 3 转换为 "Value: 3"
        """
        val = f"Value: {data!s}"
        return val

    @sub_wf.node(name="annotate_parity")
    def annotate_parity(item: str) -> Any:
        """
        根据 parse_value 节点输出的字符串,
        判断原始数字的奇偶性,并追加说明:
        输出格式如 "Value: 3 -> 奇数" 或 "Value: 4 -> 偶数"
        这里假设传入数据可以转换为整数进行奇偶性判断
        """
        try:
            num = int(item.split(":")[1].strip())
            parity = "偶数" if num % 2 == 0 else "奇数"
            return f"{item} -> {parity}"
        except (ValueError, IndexError) as e:
            return f"{item} -> 无法判断奇偶性: {e}"

    # 设置子工作流节点之间的边
    sub_wf.add_edge(START, "parse_value")
    sub_wf.add_edge("parse_value", "annotate_parity")
    sub_wf.add_edge("annotate_parity", END)

    # 将子工作流封装为一个节点
    SubNode = sub_wf.as_node(
        NodeConfig(
            name="sub",
            type=WorkflowNode.__name__,
            with_context_when_called=True,
            input_process_mode=InputProcessMode.WRAP_GENERATOR_WHEN_GENERATOR_IN,
        )
    )
    # ----------------- 构建主工作流 -----------------
    global_wf = Workflow()
    global_wf.add_node(SubNode)

    # 新增的 finalize 节点,实现实用的聚合功能
    @global_wf.node(name="finalize")
    def finalize(data: Iterator[str]) -> Iterator[Any]:
        """
        finalize 节点从子流程输出的字符串中解析数字和奇偶性,
        分别统计奇数和偶数的个数及平均值,并输出聚合统计结果。
        预期输入格式示例:"Value: 9 -> 奇数"
        """
        odd_numbers = []
        even_numbers = []
        for item in data:
            try:
                # 预期格式:"Value: 9 -> 奇数"
                parts = item[0].split("->")
                if len(parts) != 2:  # noqa: PLR2004
                    print(f"格式不符: {item[0]}")
                    continue
                num_str = parts[0].replace("Value:", "").strip()
                num = int(num_str)
                parity = parts[1].strip()
                if parity == "奇数":
                    odd_numbers.append(num)
                elif parity == "偶数":
                    even_numbers.append(num)
                else:
                    print(f"未知的奇偶性: {item[0]}")
            except (ValueError, IndexError) as e:
                print(f"解析失败: {item[0]}, 错误: {e}")
                continue

        def avg(nums):
            return sum(nums) / len(nums) if nums else 0

        total_count = len(odd_numbers) + len(even_numbers)
        result = (
            f"聚合结果 -> 总数量: {total_count}; "
            f"奇数: 数量 = {len(odd_numbers)}, 平均 = {avg(odd_numbers):.2f}; "
            f"偶数: 数量 = {len(even_numbers)}, 平均 = {avg(even_numbers):.2f}"
        )
        yield result

    # 设置主工作流边:START -> sub -> finalize -> END
    global_wf.add_edge(START, SubNode.config.name)
    global_wf.add_edge(SubNode.config.name, "finalize")
    global_wf.add_edge("finalize", END)

    def generate():
        for i in range(10):
            time.sleep(1)
            yield i
        time.sleep(1)

    print("主Workflow开始运行:")
    resp = global_wf.stream(generate())
    for itr in resp:
        print(f"result: {itr}")

说明

  • 本示例通过子流程实现了将输入数字先格式化,再判断奇偶,最后汇总统计的功能。
  • 子流程通过 as_node 封装为一个节点,可直接嵌入主工作流,实现模块化设计,并提高工作流的复用性。

使用本示例,你可以快速了解如何构建和集成子流程,从而实现复杂的数据处理与统计功能。