跳转至

Workflow

通过恰当的提示词、工具、模型、知识库、记忆等能力构建的Agent,可以帮助用户用最短的时间构建一个功能完备的AI应用。

但当用户的需求更加复杂时,或者通过特定逻辑编排可以获得更好的效果时,单个Agent可能不是最佳选择。此时,用户可以通过组合多个逻辑节点/Agent来构建一个更复杂的AI应用。

TongAgents提供了一个通用的workflow编排框架:

  • 开箱即用:一系列实用的nodeAgent
  • 灵活扩展:用户可以实现自己的nodeAgent,并编排到workflow中
  • 链路可视化:开发者可以方便的查看workflow的执行链路,并进行调试

2.0 架构说明

workflow 为 tongagents 企业版能力,底层工具/MCP/运行环境由开源 chuang_agent 提供。2.0 引入 WorkFlowEnv,用于在环境级别管理 workflow 生命周期与工具调用。

你真的需要workflow吗?

TongAgents为不同程度的复杂性提供了不同的解决方案:

  • 如果你只是需要一个简单的、单个Agent的AI应用,请参考Agent
  • Agent难以支撑业务的复杂度时,请使用workflow,但尽可能将复杂度控制在nodeAgent内部。
  • 最后,workflow提供command协议用于提供超出常规pipeline的复杂能力,请参考Command

请不必提前担心业务的复杂度会逐步提升,TongAgents框架支持从单个Agent逐步迁移到复杂的workflow。当遇到问题再重构也不迟,而且非常简单。

WorkFlowEnv

TongAgents 2.0 提供 WorkFlowEnv,基于 chuang_agent 的 DefaultEnvironment。在企业场景中,可以通过 WorkFlowEnv 统一管理 workflow 的创建、运行、工具与 MCP 调度,并复用沙箱与工作区能力。

编排与执行

TongAgent提供了一系列开箱即用的nodeAgent,用户可以通过组合这些节点来构建复杂的workflow。

workflow的编排有三种方式:

  1. 通过workflow实例的add_nodeadd_edge方法
  2. 继承Workflow,并通过@node_declare装饰器定义节点和边
  3. 通过WorkflowConfig配置化构建workflow

对于简单使用workflow,可直接使用add_nodeadd_edge方法。对于可能需要继承和复用的场景,可继承Workflow,并使用@node_declare装饰器。当需要持久化workflow时,可使用配置化的WorkflowConfig方式。

以一个2节点的工作流为例

STARTweb_search(搜索)llm(大模型)END用户输入,进行搜索将用户输入和检索结果送到大模型处理大模型处理结果输出
STARTweb_search(搜索)llm(大模型)END用户输入,进行搜索将用户输入和检索结果送到大模型处理大模型处理结果输出

方式一:add_nodeadd_edge

Workflow实例提供了add_nodeadd_edge方法,用于添加节点和边。

from tongagents.nodes.llm_node import LLMNode, LLMNodeConfig
from tongagents.workflow.nodes.input_node import InputNode, InputNodeConfig
from tongagents.workflow.simple_workflow import END, START, Workflow

llm_node = LLMNode(
    LLMNodeConfig(
        name="llm_node",
        type="llm",
        model={
            "model_provider": 1,
            "model_name": "gpt-3.5-turbo",
            "url": "https://api.openai.com/v1/chat/completions",
            "api_key": "your-api-key",
            "n": 1,
            "stream": False,
        },
        prompt_template=[{"role": "system", "text": "用户的问题是:{{#sys.query#}}"}],
    )
)

input_node = InputNode(
    InputNodeConfig(
        name="input",
        type="input",
    )
)

workflow = Workflow()
workflow.add_node(
    llm_node,
)

workflow.add_node(
    input_node,
)

workflow.add_edge(START, input_node.config.name)
workflow.add_edge(input_node.config.name, llm_node.config.name)
workflow.add_edge(llm_node.config.name, END)


result = workflow.step("你好啊")
print(result)
"""
你好呀!有什么我能帮忙的事情尽管跟我说。
"""

方式二:@node_declare

@node_declare装饰器用于定义节点和边,并将其添加到workflow中。

from tongagents.workflow.nodes.simple_node_base import END, START
from tongagents.workflow.simple_workflow import Workflow, node_declare


class DemoWorkflow(Workflow):
    @node_declare(name="web_search", edges=[(START, "web_search")])
    def web_search(self, event):
        return self._search(event)

    @node_declare(name="llm", edges=[("web_search", "llm"),("llm", END)])
    def llm(self, event):
        return self._llm_process(event)
当构建一个DemoWorkflow实例时,其效果等同于:

workflow = DemoWorkflowAgent()
workflow.add_node(workflow.web_search)
workflow.add_node(workflow.llm)
workflow.add_edge(START, "web_search")
workflow.add_edge("web_search", "llm")
workflow.add_edge("llm", END)

方式三:配置化构建

Workflow提供了一个create方法,用于配置化构建workflow

一个json类型的配置文件示例:

{
    "nodes": [
        {
            "name": "llm123123",
            "type": "LLMNode",
            "model": {
                "model_provider": 1,
                "model_name": "fake_model",
                "url": "fake_url",
                "api_key": "fake_api",
                "n": 1,
                "stream": false
            },
            "prompt_template": [
                {
                    "role": "system",
                    "text": "{{#sys.query#}}",
                    "id": "2715def2-fa84-4360-a139-772c475b9801"
                }
            ]
        },
        {
            "name": "output123123",
            "type": "OutputNode",
            "answer": "这是输出节点的模板: {{#llm123123.text#}}"
        }
    ],
    "edges": [
        {
            "source": "llm123123",
            "destination": "output123123"
        }
    ]
}
假设将这个配置存放在workflow_config.json文件中,则可以通过以下方式构建workflow

from tongagents.workflow.simple_workflow import Workflow


with open("workflow_config.json", encoding="utf-8") as f:
    workflow_config=f.read().strip()

workflow = Workflow.create(workflow_config)
workflow.step("你好")

Node

workflow中可以作为节点的对象包括:

  • function:一个函数,输入为EventEvent的迭代器,输出为EventEvent的迭代器
  • NodeBase的子类:基于NodeBase的节点
  • agent:自主规划的Agent可以被视作一个节点
  • workflowworkflow中可以包含其他workflow,并将其视为内部的节点

开箱即用的节点

我们提供了一些常用的节点,开发者可以参考下表:

分类 节点类型 说明
基础 llm 基于LLM的节点
基础 output 输出节点
基础 input 输入节点
基础 http HTTP请求节点
基础 if_else 条件判断节点
基础 intent_recognition 意图识别节点
工具 tool 基于工具的节点
具身 mm_info_extract_node 多模态信息提取
具身 embody_command_node 具身指令执行

其中,工具类包括:

  • amap_weather:高德地图天气查询
  • python_code_interpreter:代码解释器
  • image_understanding:图像理解
  • long_document_understanding:长文档理解
  • web_crawler:网页爬虫
  • web_search:网页搜索

等任意本地、远程、知识库、以及用户自定义的工具。

输入节点

输入节点用于接收用户输入。

JSON配置示例:

{
    "name": "input",
    "type": "input",
    "with_context_when_called": true
}

代码实现示例:

from tongagents.workflow.nodes.input_node import InputNode, InputNodeConfig

input_node = InputNode(
    InputNodeConfig(
        name="input",
        type="input",
        with_context_when_called=True,
    )
)

workflow.add_node("input", input_node)

HTTP节点

HTTP节点用于发送HTTP/HTTPS请求。

JSON配置示例:

{
    "name": "http_node",
    "type": "http",
    "url": "https://api.example.com/test",
    "method": "POST",
    "headers": {
        "Content-Type": "application/json"
    },
    "body": {
        "test": "data"
    },
    "timeout": 30,
    "verify_ssl": true,
    "response_key": "http_response"
}

代码实现示例:

from tongagents.nodes.http_node import HTTPNode, HTTPNodeConfig

http_node = HTTPNode(
    HTTPNodeConfig(
        name="http_node",
        type="http",
        url="https://api.example.com/test",
        method="POST",
        headers={"Content-Type": "application/json"},
        body={"test": "data"},
        timeout=30,
        verify_ssl=True,
        response_key="http_response"
    )
)

workflow.add_node("http_node", http_node)

条件判断节点

条件判断节点用于根据条件选择不同的执行路径。

JSON配置示例:

{
    "name": "if_else",
    "type": "if_else",
    "condition": "{{#input.text#}} == 'yes'",
    "true_branch": "output_yes",
    "false_branch": "output_no"
}

代码实现示例:

from tongagents.workflow.nodes.ifelse_node import IfElseNode, IfElseNodeConfig

if_else_node = IfElseNode(
    IfElseNodeConfig(
        name="if_else",
        type="if_else",
        condition="{{#input.text#}} == 'yes'",
        true_branch="output_yes",
        false_branch="output_no"
    )
)

workflow.add_node("if_else", if_else_node)

意图识别节点

意图识别节点用于识别用户输入的意图。

JSON配置示例:

{
    "name": "intent_recognition",
    "type": "intent_recognition",
    "model": {
        "model_provider": 1,
        "model_name": "gpt-3.5-turbo",
        "url": "https://api.openai.com/v1/chat/completions",
        "api_key": "your-api-key",
        "n": 1,
        "stream": false
    },
    "intent_mappings": {
        "greeting": "greeting_node",
        "farewell": "farewell_node",
        "general": "general_node"
    },
    "default_node": "general_node",
    "query": "{{#sys.query#}}"
}

代码实现示例:

from tongagents.nodes.intent_recognition_node import IntentRecognitionNode, IntentRecognitionNodeConfig
from tongagents.agents.llm.base import ModelConfig

intent_node = IntentRecognitionNode(
    IntentRecognitionNodeConfig(
        name="intent_recognition",
        type="intent_recognition",
        with_context_when_called=True,
        model=ModelConfig(
            model_provider=1,
            model_name="gpt-3.5-turbo",
            url="https://api.openai.com/v1/chat/completions",
            api_key="your-api-key",
            n=1,
            stream=False
        ),
        intent_mappings={
            "greeting": "greeting_node",
            "farewell": "farewell_node",
            "general": "general_node"
        },
        default_node="general_node",
        query="{{#sys.query#}}"
    )
)

workflow.add_node("intent_recognition", intent_node)

LLM节点

LLM节点是workflow中最重要的节点之一,用于执行LLM任务。

JSON配置示例:

{
    "name": "llm_node",
    "type": "llm",
    "model": {
        "model_provider": 1,
        "model_name": "gpt-3.5-turbo",
        "url": "https://api.openai.com/v1/chat/completions",
        "api_key": "your-api-key",
        "n": 1,
        "stream": false
    },
    "prompt_template": [
        {
            "role": "system",
            "text": "web检索的结果是:{{#web_search.tool_result#}}用户的问题是:{{#sys.query#}}"
        }
    ]
}

代码实现示例:

from tongagents.nodes.llm_node import LLMNode, LLMNodeConfig

llm_node = LLMNode(
    LLMNodeConfig(
        name="llm_node",
        type="llm",
        model={
            "model_provider": 1,
            "model_name": "gpt-3.5-turbo",
            "url": "https://api.openai.com/v1/chat/completions",
            "api_key": "your-api-key",
            "n": 1,
            "stream": False
        },
        prompt_template=[
            {
                "role": "system",
                "text": "web检索的结果是:{{#web_search.tool_result#}}用户的问题是:{{#sys.query#}}"
            }
        ]
    )
)

workflow.add_node("llm_node", llm_node)

工具节点

工具节点用于执行工具任务。

JSON配置示例:

{
    "name": "web_search",
    "type": "tool",
    "tool_name": "web_search",
    "tool_args": {
        "query": "{{#sys.query#}}"
    }
}

代码实现示例:

from tongagents.workflow.nodes.tool_node import ToolNode, ToolNodeConfig

tool_node = ToolNode(
    ToolNodeConfig(
        name="web_search",
        type="tool",
        tool_name="web_search",
        tool_args={
            "query": "{{#sys.query#}}"
        }
    )
)

workflow.add_node("web_search", tool_node)

输出节点

输出节点用于输出结果。

JSON配置示例:

{
    "name": "output",
    "type": "output",
    "answer": "{{#llm_node.text#}}",
    "return_dict": true
}

代码实现示例:

from tongagents.workflow.nodes.output_node import OutputNode, OutputNodeConfig

output_node = OutputNode(
    OutputNodeConfig(
        name="output",
        type="output",
        answer="{{#llm_node.text#}}",
        return_dict=True
    )
)

workflow.add_node("output", output_node)

节点连接

节点之间通过边(edge)连接,定义执行流程。

JSON配置示例:

{
    "edges": [
        {
            "source": {
                "cell": "input",
                "port": "out.input"
            },
            "target": {
                "cell": "llm_node",
                "port": "in.llm_node"
            }
        },
        {
            "source": {
                "cell": "llm_node",
                "port": "out.llm_node"
            },
            "target": {
                "cell": "output",
                "port": "in.output"
            }
        }
    ]
}

代码实现示例:

from tongagents.workflow.workflow import START, END

# 添加边
workflow.add_edge(START, "input")
workflow.add_edge("input", "llm_node")
workflow.add_edge("llm_node", "output")
workflow.add_edge("output", END)

其他节点可参考具体的代码实现和接口文档,这里不再赘述。

自定义节点

基于function的节点

import requests

from tongagents.agents.llm.messages import UserPromptMessage
from tongagents.workflow.simple_workflow import END, START, Workflow


def http_request(query: str) -> dict:
    response = requests.request(
        "GET", "https://api.example.com/search", params={"q": query}
    )
    return response.json()


workflow = Workflow()
workflow.add_node(
    http_request,
    NodeConfig(
        name = "http_request",
        type = "DecroteNode",
        with_context_when_called = True
    )
)
workflow.add_edge(START, "http_request")
workflow.add_edge("http_request", END)

result = workflow.step(UserPromptMessage("哪吒2票房是多少?"))
print(result)

function类型的节点适用于一些简单的、无需复用的逻辑,适合用于快速构建简单的workflow。

继承NodeBase的节点

  1. 定义NodeConfig
  2. 继承NodeBase,并实现run方法

from typing import Any, Iterator

import requests

from tongagents.workflow.nodes.simple_node_base import NodeBase, NodeConfig
from tongagents.workflow.simple_workflow import RunContext, node_declare


class APINodeConfig(NodeConfig):
    url: str
    method: str
    headers: dict
    body: dict


class APINode(NodeBase):
    def __init__(self, config: NodeConfig):
        print(f"init llm node, config: {config}")
        if not isinstance(config, APINodeConfig):
            raise ValueError("config must be a APINodeConfig")
        super().__init__(config)

    @node_declare()
    def run(self, events: Iterator[Any], context: RunContext = None) -> Iterator[Any]:
        for event in events:
            # 发送请求
            response = requests.request(self.config.method, self.config.url, headers=self.config.headers, data=self.config.body)
            # 返回响应
            yield response
上例中给出了一个HTTP请求的节点,并且可以灵活的在不同的workflow中复用。

NodeConfig 定义了节点运行时的配置,从而可以实现节点复用。

NodeBase#run 定义了节点运行时的逻辑。

数据处理模式

在TongAgents workflow框架中,数据处理模式是一个关键概念,它定义了节点如何处理来自多个上游节点的输入事件。引入数据处理模式主要基于以下考虑:

  1. 数据流调度机制:在默认情况下,workflow的调度遵循事件驱动模型。对于具有多个上游输入的节点,一旦任何一个输入事件到达,该节点便可能被触发执行。这种机制虽然能够提高并行处理效率,但在某些业务场景下可能无法满足需求。
  2. 节点处理语义差异:不同业务场景下的节点对输入数据的处理需求各不相同。例如:

    • 某些节点虽然连接了多个上游数据源,但这些数据源产生的是结构相同的数据,节点只需要简单地对每个事件单独处理。
    • 而另一些节点则需要汇聚所有上游数据,形成一个完整的上下文后再进行综合处理和决策。

通过配置适当的数据处理模式,开发者可以精确控制节点的输入处理行为,实现更加复杂和灵活的数据流编排。

MERGE_EVENTS_AS_EVENT_MAP

MERGE_EVENTS_AS_EVENT_MAP 是一个特殊的数据处理模式,它允许一个节点等待所有上游依赖节点的数据都准备好后,再进行处理。这在需要整合多个数据源的场景中特别有用。

输入: 节点X: 事件A
      节点Y: 事件B
  │       │
  └───┬───┘
[创建事件映射]
输出: 事件字典 {
  "节点X": 事件A,
  "节点Y": 事件B
}
以下是一个旅游信息助手的例子,它需要同时获取天气、景点和酒店信息,然后统一进行处理:

from collections.abc import Iterator

from tongagents.base_agent import WorkflowAgent
from tongagents.workflow.nodes.simple_node_base import END, START, InputProcessMode
from tongagents.workflow.simple_workflow import node_declare


class AggregationAgent(WorkflowAgent):
    def __init__(self):
        super().__init__()

    @node_declare(
        name="weather_node",
        edges=[(START, "weather_node")],
    )
    def weather_node(self, event) -> str:
        if event == "杭州":
            return "晴天"
        return "多云"

    @node_declare(
        name="attraction_node",
        edges=[(START, "attraction_node")],
    )
    def attraction_node(self, event) -> str:
        if event == "杭州":
            return "西湖"
        return "天安门"

    @node_declare(
        name="hotel_node",
        edges=[(START, "hotel_node")],
    )
    def hotel_node(self, event) -> str:
        if event == "杭州":
            return "ABC酒店"
        return "DEF酒店"

    @node_declare(
        name="travel_advisor",
        edges=[
            ("weather_node", "travel_advisor"),
            ("attraction_node", "travel_advisor"),
            ("hotel_node", "travel_advisor"),
            ("travel_advisor", END),
        ],
        input_process_mode=InputProcessMode.MERGE_EVENTS_AS_EVENT_MAP,
    )
    def travel_advisor(self, event) -> Iterator[str]:
        return f"今天天气{event['weather_node']},去{event['attraction_node']},住在{event['hotel_node']}"


if __name__ == "__main__":
    agent = AggregationAgent()

    output = [item for item in agent.stream("杭州")]
    print(output)
    output = [item for item in agent.stream("北京")]
    print(output)

"""
['今天天气晴天,去西湖,住在ABC酒店']
['今天天气多云,去天安门,住在DEF酒店']
"""

在这个例子中,travel_advisor 节点使用了 MERGE_EVENTS_AS_EVENT_MAP 模式,它会等待天气、景点和酒店三个数据源节点的结果全部准备好后,才会执行。这确保了LLM可以基于完整的信息生成全面的旅游建议,而不是基于部分信息生成不完整的建议。

这种模式特别适用于以下场景:

  • 需要整合多个异步数据源的信息
  • 所有数据源的信息都是必需的,缺一不可
  • 需要基于完整信息做出决策或生成内容

调试模式

TongAgents workflow框架提供了强大的调试功能,帮助开发者深入了解workflow的执行过程。调试模式能够透出节点执行的全生命周期状态,支持同步和异步两种执行模式,同时保持原有工作流执行逻辑不变。

启用调试模式

在执行workflow时,通过设置debug=True参数即可启用调试模式。调试模式下,执行方法会返回两个生成器:

  • event_generator: 原始工作流执行结果
  • debug_generator: 调试信息流
# 同步执行模式
resp, debug_resp = workflow.run(input_generator(), debug=True)
resp, debug_resp = workflow.stream("你好", debug=True)

# 异步执行模式
resp, debug_resp = workflow.arun(input_generator(), debug=True)
resp, debug_resp = workflow.astream("你好", debug=True)

调试事件类型

调试模式会产生四种类型的调试事件,涵盖节点执行的完整生命周期:

事件类型 说明 触发时机
node_start 节点开始执行 节点准备开始处理输入事件时
node_process 节点处理中间结果 节点正在处理事件时(可选)
node_end 节点执行完成 节点成功完成处理时
node_exception 节点执行异常 节点执行过程中发生异常时

调试事件结构

每个调试事件都遵循统一的DebugEvent结构:

class DebugEvent:
    type: str        # 事件类型
    data: dict       # 事件相关数据

不同类型的调试事件包含的数据字段如下:

node_start/node_end/node_process

{
    "node_name": "producer",      # 节点名称
    "in_event": {...},           # 输入事件数据
    "timestamp": "2024-01-01...", # 时间戳(可选)
}

node_exception

{
    "node_name": "consumer",      # 节点名称
    "error_info": "测试异常",     # 异常信息
    "timestamp": "2024-01-01...", # 时间戳(可选)
}

基本使用示例

以下是一个完整的调试模式使用示例:

from tongagents.base_agent import WorkflowAgent
from tongagents.entity import Event
from tongagents.workflow.nodes.simple_node_base import END, START
from tongagents.workflow.simple_workflow import RunContext, node_declare


class DebugWorkflowAgent(WorkflowAgent):
    @node_declare(
        name="producer",
        edges=[(START, "producer"), ("producer", "consumer")],
    )
    def producer(self, reqs):
        return Event(hello="你好")

    @node_declare(
        name="consumer",
        edges=[("consumer", END)],
        with_context_when_called=True,
    )
    def consumer(self, event, context: RunContext[dict]):
        return Event(hello="你好2")


# 创建workflow实例
workflow = DebugWorkflowAgent()

# 启用调试模式执行
def input_generator():
    yield "用户输入"

resp, debug_resp = workflow.run(input_generator(), debug=True)

# 处理执行结果
for result in resp:
    print(f"执行结果: {result}")

# 处理调试信息
for debug_event in debug_resp:
    print(f"调试事件: {debug_event.type} - {debug_event.data}")

"""
执行结果: description=None event_happen_time='2025-05-30 16:59:13' hello='你好2' data_type='Event'
调试事件: node_start - {'node_name': 'producer', 'in_event': '用户输入'}
调试事件: node_end - {'node_name': 'producer', 'in_event': Event(description=None, event_happen_time='2025-05-30 16:59:13', hello='你好', data_type='Event')}
调试事件: node_start - {'node_name': 'consumer', 'in_event': Event(description=None, event_happen_time='2025-05-30 16:59:13', hello='你好', data_type='Event')}
调试事件: node_end - {'node_name': 'consumer', 'in_event': Event(description=None, event_happen_time='2025-05-30 16:59:13', hello='你好2', data_type='Event')}
调试事件: node_start - {'node_name': '__end__', 'in_event': Event(description=None, event_happen_time='2025-05-30 16:59:13', hello='你好2', data_type='Event')}
调试事件: node_end - {'node_name': '__end__', 'in_event': Event(description=None, event_happen_time='2025-05-30 16:59:13', hello='你好2', data_type='Event')}
"""

异常调试

当节点执行过程中发生异常时,调试模式会捕获异常信息并生成node_exception事件:

class ExceptionWorkflowAgent(WorkflowAgent):
    @node_declare(
        name="error_node",
        edges=[(START, "error_node"), ("error_node", END)],
    )
    def error_node(self, event):
        raise Exception("模拟异常")
        return Event(result="不会执行到这里")


workflow = ExceptionWorkflowAgent()
resp, debug_resp = workflow.stream("测试", debug=True)

try:
    # 处理执行结果(会抛出异常)
    for result in resp:
        print(result)
except Exception as e:
    print(f"捕获到异常: {e}")

# 查看调试信息中的异常事件
for debug_event in debug_resp:
    if debug_event.type == "node_exception":
        print(f"节点异常: {debug_event.data['node_name']} - {debug_event.data['error_info']}")
"""
捕获到异常: 模拟异常
节点异常: error_node - 模拟异常
"""

异步调试模式

对于异步workflow,调试模式的使用方式类似,但需要使用异步迭代:

import asyncio

class AsyncWorkflowAgent(WorkflowAgent):
    @node_declare(
        name="async_producer",
        edges=[(START, "async_producer"), ("async_producer", END)],
    )
    async def async_producer(self, event):
        await asyncio.sleep(0.1)  # 模拟异步操作
        return Event(result="异步处理完成")


async def main():
    workflow = AsyncWorkflowAgent()

    # 异步调试模式
    resp, debug_resp = workflow.astream("测试", debug=True)

    # 处理执行结果
    async for result in resp:
        print(f"异步结果: {result}")

    # 处理调试信息
    async for debug_event in debug_resp:
        print(f"异步调试事件: {debug_event.type} - {debug_event.data}")

# 运行异步示例
asyncio.run(main())
"""
异步结果: description=None event_happen_time='2025-05-30 17:01:51' result='异步处理完成' data_type='Event'
异步调试事件: node_start - {'node_name': 'async_producer', 'in_event': '测试'}
异步调试事件: node_end - {'node_name': 'async_producer', 'in_event': Event(description=None, event_happen_time='2025-05-30 17:01:51', result='异步处理完成', data_type='Event')}
异步调试事件: node_start - {'node_name': '__end__', 'in_event': Event(description=None, event_happen_time='2025-05-30 17:01:51', result='异步处理完成', data_type='Event')}
异步调试事件: node_end - {'node_name': '__end__', 'in_event': Event(description=None, event_happen_time='2025-05-30 17:01:51', result='异步处理完成', data_type='Event')}
"""

Human in the Loop

TongAgents框架提供了优雅的Human in the Loop机制,允许AI工作流在执行过程中暂停并等待人类输入,然后继续执行。这种机制具有以下特点:

  • 多次中断支持:在单个节点中可以多次请求人类输入
  • 幂等性保证:支持重试机制,确保执行的一致性
  • 状态管理:优雅的中断状态管理和恢复机制
  • 简单易用:通过简单的interrupt()函数调用即可实现人机交互

核心概念

中断机制

interrupt()函数用于在workflow节点执行过程中暂停执行,等待人类输入:

from tongagents.workflow.nodes.simple_node_base import interrupt

def consumer(self, event, context: RunContext[dict]):
    # 第一次中断,等待人类输入
    res1 = interrupt("请提供第一个参数", context, "consumer", event)

    # 第二次中断,等待人类输入
    res2 = interrupt("请提供第二个参数", context, "consumer", event)

    return Event(result=res1 + res2)

恢复机制

使用ResumeCommand来响应中断并提供人类输入:

from tongagents.workflow.commands.entrance_takeover_command import ResumeCommand

# 响应中断
resp = workflow.step(ResumeCommand(human_input="用户输入的内容"))

基本使用示例

以下是一个完整的Human in the Loop使用示例:

from collections.abc import Iterator
from tongagents.base_agent import WorkflowAgent
from tongagents.entity import Event
from tongagents.workflow.commands.entrance_takeover_command import ResumeCommand
from tongagents.workflow.nodes.simple_node_base import END, START, interrupt
from tongagents.workflow.simple_workflow import RunContext, node_declare


class InteractiveWorkflow(WorkflowAgent):
    @node_declare(
        name="producer",
        edges=[(START, "producer"), ("producer", "consumer")],
    )
    def producer(self, reqs) -> Iterator[str]:
        return Event(hello="开始处理")

    @node_declare(
        name="consumer",
        edges=[("consumer", END)],
        with_context_when_called=True,
    )
    def consumer(self, event, context: RunContext[dict]) -> Iterator[str]:
        # 第一次中断
        res1 = interrupt("请输入第一个参数", context, "consumer", event)

        # 第二次中断
        res2 = interrupt("请输入第二个参数", context, "consumer", event)

        return Event(result=f"处理结果: {res1} + {res2}")


# 创建workflow实例
workflow = InteractiveWorkflow()

# 第一次执行,触发第一个中断
resp = workflow.step("开始")
print("[1]", resp)  # 输出: 请输入第一个参数

# 响应第一次中断,触发第二个中断
resp = workflow.step(ResumeCommand(human_input="参数1"))
print("[2]", resp)  # 输出: 请输入第二个参数

# 响应第二次中断,完成执行
resp = workflow.step(ResumeCommand(human_input="参数2"))
print("[3]", resp)  # 输出: Event(result="处理结果: 参数1 + 参数2")

多次中断的执行流程

当节点中包含多次interrupt()调用时,执行流程如下:

时间线 ------>

第一次执行:
interrupt_id=0 ----[中断]----> 等待人类输入
                              |
第二次执行(恢复):              v
interrupt_id=0 ----[返回缓存]---> interrupt_id=1 ----[中断]----> 等待人类输入
                                                               |
第三次执行(恢复):                                              v
interrupt_id=0 ----[返回缓存]---> interrupt_id=1 ----[返回缓存]---> 继续执行完成

幂等性机制

TongAgents的中断机制具有重要的幂等性特征:

执行恢复点

重要:当workflow从中断恢复时,执行是从中断节点的第一行代码开始,而不是阻塞在interrupt()调用处。这意味着:

def consumer(self, event, context: RunContext[dict]):
    print("节点开始执行")  # 每次恢复都会重新执行这一行

    res1 = interrupt("第一次中断", context, "consumer", event)
    print(f"获得第一次输入: {res1}")  # 恢复后才会执行这一行

    res2 = interrupt("第二次中断", context, "consumer", event)
    print(f"获得第二次输入: {res2}")  # 最终恢复后才会执行这一行

    return Event(result=res1 + res2)

中断计数器

框架使用interrupt_counter来确保幂等性:

  • 每次进入节点时重置计数器
  • 每次调用interrupt()时计数器递增
  • 重试时通过计数器判断是否返回缓存的输入

输入事件一致性

框架保证多次恢复时的输入event保持一致,确保节点逻辑的可预测性。

循环中的中断

支持在for循环中调用interrupt(),但需要注意以下限制:

def consumer(self, event, context: RunContext[dict]):
    results = []

    # ✅ 推荐:循环次数固定且取决于输入event
    items = event.get("items", [])  # 根据输入确定循环次数
    for i, item in enumerate(items):
        user_input = interrupt(f"请处理项目 {item}", context, "consumer", event)
        results.append(user_input)

    return Event(results=results)

注意事项

  1. 避免随机循环次数:循环次数应该是确定的,基于输入event计算得出
  2. 避免条件依赖中断结果:不要让循环的继续条件依赖于中断的返回值
# ❌ 不推荐:循环次数不确定
def bad_example(self, event, context: RunContext[dict]):
    while True:
        user_input = interrupt("继续吗?", context, "consumer", event)
        if user_input == "停止":
            break  # 这会导致重试时行为不一致

高级用法示例

条件中断

def conditional_consumer(self, event, context: RunContext[dict]):
    result = event.get("auto_process", False)

    if not result:
        # 只有在需要时才中断
        user_decision = interrupt("是否继续处理?", context, "consumer", event)
        if user_decision.lower() != "yes":
            return Event(status="cancelled")

    # 继续正常处理
    return Event(status="completed")

数据验证中断

def validation_consumer(self, event, context: RunContext[dict]):
    data = event.get("data")

    # 验证数据
    if not self.validate_data(data):
        corrected_data = interrupt(
            f"数据验证失败: {data}。请提供正确的数据",
            context,
            "consumer",
            event
        )
        data = corrected_data

    # 处理验证后的数据
    return Event(processed_data=self.process(data))

通过Human in the Loop机制,TongAgents能够在AI自动化和人类智慧之间建立有效的桥梁,实现更加灵活和可控的工作流执行。

Command

Commandworkflow的扩展能力,用于支持一些超出常规pipeline的复杂能力。

Node 可以通过yield 返回Command,从而实现Node之间的特殊数据交互逻辑。

目前支持的Command包括:

  • TransferCommand: 将当前节点的事件转移给另一个节点
  • EntranceTakeOverCommand: 将当前节点的事件转移给另一个节点,并接管整个流程
  • EntranceTakeOffCommand: 将当前节点的事件转移给另一个节点,并停止当前节点的执行
  • RestartNodeCommand: 重新执行当前节点