嵌套工作流示例¶
TongAgents 提供了灵活的工作流构建能力,可以将多个节点组合成子流程,实现模块化数据处理。该示例展示了如何通过子流程先将输入数值格式化,再判断奇偶性,最后对结果进行汇总统计。
环境准备¶
代码示例¶
examples/workflow/sub_workflow.py
import time
from collections.abc import Iterator
from typing import Any
from tongagents.workflow.nodes.simple_node_base import (
END,
START,
InputProcessMode,
NodeConfig,
)
from tongagents.workflow.simple_workflow import Workflow, WorkflowNode
if __name__ == "__main__":
# ----------------- 构建子工作流 -----------------
sub_wf = Workflow()
@sub_wf.node(name="parse_value")
def parse_value(data: Any) -> Any:
"""
将输入数值转换为格式化字符串
例如,输入 3 转换为 "Value: 3"
"""
val = f"Value: {data!s}"
return val
@sub_wf.node(name="annotate_parity")
def annotate_parity(item: str) -> Any:
"""
根据 parse_value 节点输出的字符串,
判断原始数字的奇偶性,并追加说明:
输出格式如 "Value: 3 -> 奇数" 或 "Value: 4 -> 偶数"
这里假设传入数据可以转换为整数进行奇偶性判断
"""
try:
num = int(item.split(":")[1].strip())
parity = "偶数" if num % 2 == 0 else "奇数"
return f"{item} -> {parity}"
except (ValueError, IndexError) as e:
return f"{item} -> 无法判断奇偶性: {e}"
# 设置子工作流节点之间的边
sub_wf.add_edge(START, "parse_value")
sub_wf.add_edge("parse_value", "annotate_parity")
sub_wf.add_edge("annotate_parity", END)
# 将子工作流封装为一个节点
SubNode = sub_wf.as_node(
NodeConfig(
name="sub",
type=WorkflowNode.__name__,
with_context_when_called=True,
input_process_mode=InputProcessMode.WRAP_GENERATOR_WHEN_GENERATOR_IN,
)
)
# ----------------- 构建主工作流 -----------------
global_wf = Workflow()
global_wf.add_node(SubNode)
# 新增的 finalize 节点,实现实用的聚合功能
@global_wf.node(name="finalize")
def finalize(data: Iterator[str]) -> Iterator[Any]:
"""
finalize 节点从子流程输出的字符串中解析数字和奇偶性,
分别统计奇数和偶数的个数及平均值,并输出聚合统计结果。
预期输入格式示例:"Value: 9 -> 奇数"
"""
odd_numbers = []
even_numbers = []
for item in data:
try:
# 预期格式:"Value: 9 -> 奇数"
parts = item[0].split("->")
if len(parts) != 2: # noqa: PLR2004
print(f"格式不符: {item[0]}")
continue
num_str = parts[0].replace("Value:", "").strip()
num = int(num_str)
parity = parts[1].strip()
if parity == "奇数":
odd_numbers.append(num)
elif parity == "偶数":
even_numbers.append(num)
else:
print(f"未知的奇偶性: {item[0]}")
except (ValueError, IndexError) as e:
print(f"解析失败: {item[0]}, 错误: {e}")
continue
def avg(nums):
return sum(nums) / len(nums) if nums else 0
total_count = len(odd_numbers) + len(even_numbers)
result = (
f"聚合结果 -> 总数量: {total_count}; "
f"奇数: 数量 = {len(odd_numbers)}, 平均 = {avg(odd_numbers):.2f}; "
f"偶数: 数量 = {len(even_numbers)}, 平均 = {avg(even_numbers):.2f}"
)
yield result
# 设置主工作流边:START -> sub -> finalize -> END
global_wf.add_edge(START, SubNode.config.name)
global_wf.add_edge(SubNode.config.name, "finalize")
global_wf.add_edge("finalize", END)
def generate():
for i in range(10):
time.sleep(1)
yield i
time.sleep(1)
print("主Workflow开始运行:")
resp = global_wf.stream(generate())
for itr in resp:
print(f"result: {itr}")
说明¶
- 本示例通过子流程实现了将输入数字先格式化,再判断奇偶,最后汇总统计的功能。
- 子流程通过
as_node封装为一个节点,可直接嵌入主工作流,实现模块化设计,并提高工作流的复用性。
使用本示例,你可以快速了解如何构建和集成子流程,从而实现复杂的数据处理与统计功能。