Workflow¶
通过恰当的提示词、工具、模型、知识库、记忆等能力构建的Agent,可以帮助用户用最短的时间构建一个功能完备的AI应用。
但当用户的需求更加复杂时,或者通过特定逻辑编排可以获得更好的效果时,单个Agent可能不是最佳选择。此时,用户可以通过组合多个逻辑节点/Agent来构建一个更复杂的AI应用。
TongAgents提供了一个通用的workflow编排框架:
- 开箱即用:一系列实用的
node和Agent - 灵活扩展:用户可以实现自己的
node和Agent,并编排到workflow中 - 链路可视化:开发者可以方便的查看workflow的执行链路,并进行调试
2.0 架构说明
workflow 为 tongagents 企业版能力,底层工具/MCP/运行环境由开源 chuang_agent 提供。2.0 引入 WorkFlowEnv,用于在环境级别管理 workflow 生命周期与工具调用。
你真的需要workflow吗?
TongAgents为不同程度的复杂性提供了不同的解决方案:
- 如果你只是需要一个简单的、单个Agent的AI应用,请参考Agent。
- 当
Agent难以支撑业务的复杂度时,请使用workflow,但尽可能将复杂度控制在node和Agent内部。 - 最后,
workflow提供command协议用于提供超出常规pipeline的复杂能力,请参考Command。
请不必提前担心业务的复杂度会逐步提升,TongAgents框架支持从单个Agent逐步迁移到复杂的workflow。当遇到问题再重构也不迟,而且非常简单。
WorkFlowEnv¶
TongAgents 2.0 提供 WorkFlowEnv,基于 chuang_agent 的 DefaultEnvironment。在企业场景中,可以通过 WorkFlowEnv 统一管理 workflow 的创建、运行、工具与 MCP 调度,并复用沙箱与工作区能力。
编排与执行¶
TongAgent提供了一系列开箱即用的node和Agent,用户可以通过组合这些节点来构建复杂的workflow。
workflow的编排有三种方式:
- 通过
workflow实例的add_node和add_edge方法 - 继承
Workflow,并通过@node_declare装饰器定义节点和边 - 通过
WorkflowConfig配置化构建workflow
对于简单使用workflow,可直接使用add_node和add_edge方法。对于可能需要继承和复用的场景,可继承Workflow,并使用@node_declare装饰器。当需要持久化workflow时,可使用配置化的WorkflowConfig方式。
以一个2节点的工作流为例
方式一:add_node和add_edge¶
Workflow实例提供了add_node和add_edge方法,用于添加节点和边。
from tongagents.nodes.llm_node import LLMNode, LLMNodeConfig
from tongagents.workflow.nodes.input_node import InputNode, InputNodeConfig
from tongagents.workflow.simple_workflow import END, START, Workflow
llm_node = LLMNode(
LLMNodeConfig(
name="llm_node",
type="llm",
model={
"model_provider": 1,
"model_name": "gpt-3.5-turbo",
"url": "https://api.openai.com/v1/chat/completions",
"api_key": "your-api-key",
"n": 1,
"stream": False,
},
prompt_template=[{"role": "system", "text": "用户的问题是:{{#sys.query#}}"}],
)
)
input_node = InputNode(
InputNodeConfig(
name="input",
type="input",
)
)
workflow = Workflow()
workflow.add_node(
llm_node,
)
workflow.add_node(
input_node,
)
workflow.add_edge(START, input_node.config.name)
workflow.add_edge(input_node.config.name, llm_node.config.name)
workflow.add_edge(llm_node.config.name, END)
result = workflow.step("你好啊")
print(result)
"""
你好呀!有什么我能帮忙的事情尽管跟我说。
"""
方式二:@node_declare¶
@node_declare装饰器用于定义节点和边,并将其添加到workflow中。
from tongagents.workflow.nodes.simple_node_base import END, START
from tongagents.workflow.simple_workflow import Workflow, node_declare
class DemoWorkflow(Workflow):
@node_declare(name="web_search", edges=[(START, "web_search")])
def web_search(self, event):
return self._search(event)
@node_declare(name="llm", edges=[("web_search", "llm"),("llm", END)])
def llm(self, event):
return self._llm_process(event)
DemoWorkflow实例时,其效果等同于:
workflow = DemoWorkflowAgent()
workflow.add_node(workflow.web_search)
workflow.add_node(workflow.llm)
workflow.add_edge(START, "web_search")
workflow.add_edge("web_search", "llm")
workflow.add_edge("llm", END)
方式三:配置化构建¶
Workflow提供了一个create方法,用于配置化构建workflow。
一个json类型的配置文件示例:
{
"nodes": [
{
"name": "llm123123",
"type": "LLMNode",
"model": {
"model_provider": 1,
"model_name": "fake_model",
"url": "fake_url",
"api_key": "fake_api",
"n": 1,
"stream": false
},
"prompt_template": [
{
"role": "system",
"text": "{{#sys.query#}}",
"id": "2715def2-fa84-4360-a139-772c475b9801"
}
]
},
{
"name": "output123123",
"type": "OutputNode",
"answer": "这是输出节点的模板: {{#llm123123.text#}}"
}
],
"edges": [
{
"source": "llm123123",
"destination": "output123123"
}
]
}
workflow_config.json文件中,则可以通过以下方式构建workflow:
from tongagents.workflow.simple_workflow import Workflow
with open("workflow_config.json", encoding="utf-8") as f:
workflow_config=f.read().strip()
workflow = Workflow.create(workflow_config)
workflow.step("你好")
Node¶
workflow中可以作为节点的对象包括:
function:一个函数,输入为Event或Event的迭代器,输出为Event或Event的迭代器NodeBase的子类:基于NodeBase的节点agent:自主规划的Agent可以被视作一个节点workflow:workflow中可以包含其他workflow,并将其视为内部的节点
开箱即用的节点¶
我们提供了一些常用的节点,开发者可以参考下表:
| 分类 | 节点类型 | 说明 |
|---|---|---|
| 基础 | llm |
基于LLM的节点 |
| 基础 | output |
输出节点 |
| 基础 | input |
输入节点 |
| 基础 | http |
HTTP请求节点 |
| 基础 | if_else |
条件判断节点 |
| 基础 | intent_recognition |
意图识别节点 |
| 工具 | tool |
基于工具的节点 |
| 具身 | mm_info_extract_node |
多模态信息提取 |
| 具身 | embody_command_node |
具身指令执行 |
其中,工具类包括:
amap_weather:高德地图天气查询python_code_interpreter:代码解释器image_understanding:图像理解long_document_understanding:长文档理解web_crawler:网页爬虫web_search:网页搜索
等任意本地、远程、知识库、以及用户自定义的工具。
输入节点¶
输入节点用于接收用户输入。
JSON配置示例:
代码实现示例:
from tongagents.workflow.nodes.input_node import InputNode, InputNodeConfig
input_node = InputNode(
InputNodeConfig(
name="input",
type="input",
with_context_when_called=True,
)
)
workflow.add_node("input", input_node)
HTTP节点¶
HTTP节点用于发送HTTP/HTTPS请求。
JSON配置示例:
{
"name": "http_node",
"type": "http",
"url": "https://api.example.com/test",
"method": "POST",
"headers": {
"Content-Type": "application/json"
},
"body": {
"test": "data"
},
"timeout": 30,
"verify_ssl": true,
"response_key": "http_response"
}
代码实现示例:
from tongagents.nodes.http_node import HTTPNode, HTTPNodeConfig
http_node = HTTPNode(
HTTPNodeConfig(
name="http_node",
type="http",
url="https://api.example.com/test",
method="POST",
headers={"Content-Type": "application/json"},
body={"test": "data"},
timeout=30,
verify_ssl=True,
response_key="http_response"
)
)
workflow.add_node("http_node", http_node)
条件判断节点¶
条件判断节点用于根据条件选择不同的执行路径。
JSON配置示例:
{
"name": "if_else",
"type": "if_else",
"condition": "{{#input.text#}} == 'yes'",
"true_branch": "output_yes",
"false_branch": "output_no"
}
代码实现示例:
from tongagents.workflow.nodes.ifelse_node import IfElseNode, IfElseNodeConfig
if_else_node = IfElseNode(
IfElseNodeConfig(
name="if_else",
type="if_else",
condition="{{#input.text#}} == 'yes'",
true_branch="output_yes",
false_branch="output_no"
)
)
workflow.add_node("if_else", if_else_node)
意图识别节点¶
意图识别节点用于识别用户输入的意图。
JSON配置示例:
{
"name": "intent_recognition",
"type": "intent_recognition",
"model": {
"model_provider": 1,
"model_name": "gpt-3.5-turbo",
"url": "https://api.openai.com/v1/chat/completions",
"api_key": "your-api-key",
"n": 1,
"stream": false
},
"intent_mappings": {
"greeting": "greeting_node",
"farewell": "farewell_node",
"general": "general_node"
},
"default_node": "general_node",
"query": "{{#sys.query#}}"
}
代码实现示例:
from tongagents.nodes.intent_recognition_node import IntentRecognitionNode, IntentRecognitionNodeConfig
from tongagents.agents.llm.base import ModelConfig
intent_node = IntentRecognitionNode(
IntentRecognitionNodeConfig(
name="intent_recognition",
type="intent_recognition",
with_context_when_called=True,
model=ModelConfig(
model_provider=1,
model_name="gpt-3.5-turbo",
url="https://api.openai.com/v1/chat/completions",
api_key="your-api-key",
n=1,
stream=False
),
intent_mappings={
"greeting": "greeting_node",
"farewell": "farewell_node",
"general": "general_node"
},
default_node="general_node",
query="{{#sys.query#}}"
)
)
workflow.add_node("intent_recognition", intent_node)
LLM节点¶
LLM节点是workflow中最重要的节点之一,用于执行LLM任务。
JSON配置示例:
{
"name": "llm_node",
"type": "llm",
"model": {
"model_provider": 1,
"model_name": "gpt-3.5-turbo",
"url": "https://api.openai.com/v1/chat/completions",
"api_key": "your-api-key",
"n": 1,
"stream": false
},
"prompt_template": [
{
"role": "system",
"text": "web检索的结果是:{{#web_search.tool_result#}}用户的问题是:{{#sys.query#}}"
}
]
}
代码实现示例:
from tongagents.nodes.llm_node import LLMNode, LLMNodeConfig
llm_node = LLMNode(
LLMNodeConfig(
name="llm_node",
type="llm",
model={
"model_provider": 1,
"model_name": "gpt-3.5-turbo",
"url": "https://api.openai.com/v1/chat/completions",
"api_key": "your-api-key",
"n": 1,
"stream": False
},
prompt_template=[
{
"role": "system",
"text": "web检索的结果是:{{#web_search.tool_result#}}用户的问题是:{{#sys.query#}}"
}
]
)
)
workflow.add_node("llm_node", llm_node)
工具节点¶
工具节点用于执行工具任务。
JSON配置示例:
{
"name": "web_search",
"type": "tool",
"tool_name": "web_search",
"tool_args": {
"query": "{{#sys.query#}}"
}
}
代码实现示例:
from tongagents.workflow.nodes.tool_node import ToolNode, ToolNodeConfig
tool_node = ToolNode(
ToolNodeConfig(
name="web_search",
type="tool",
tool_name="web_search",
tool_args={
"query": "{{#sys.query#}}"
}
)
)
workflow.add_node("web_search", tool_node)
输出节点¶
输出节点用于输出结果。
JSON配置示例:
代码实现示例:
from tongagents.workflow.nodes.output_node import OutputNode, OutputNodeConfig
output_node = OutputNode(
OutputNodeConfig(
name="output",
type="output",
answer="{{#llm_node.text#}}",
return_dict=True
)
)
workflow.add_node("output", output_node)
节点连接¶
节点之间通过边(edge)连接,定义执行流程。
JSON配置示例:
{
"edges": [
{
"source": {
"cell": "input",
"port": "out.input"
},
"target": {
"cell": "llm_node",
"port": "in.llm_node"
}
},
{
"source": {
"cell": "llm_node",
"port": "out.llm_node"
},
"target": {
"cell": "output",
"port": "in.output"
}
}
]
}
代码实现示例:
from tongagents.workflow.workflow import START, END
# 添加边
workflow.add_edge(START, "input")
workflow.add_edge("input", "llm_node")
workflow.add_edge("llm_node", "output")
workflow.add_edge("output", END)
其他节点可参考具体的代码实现和接口文档,这里不再赘述。
自定义节点¶
基于function的节点¶
import requests
from tongagents.agents.llm.messages import UserPromptMessage
from tongagents.workflow.simple_workflow import END, START, Workflow
def http_request(query: str) -> dict:
response = requests.request(
"GET", "https://api.example.com/search", params={"q": query}
)
return response.json()
workflow = Workflow()
workflow.add_node(
http_request,
NodeConfig(
name = "http_request",
type = "DecroteNode",
with_context_when_called = True
)
)
workflow.add_edge(START, "http_request")
workflow.add_edge("http_request", END)
result = workflow.step(UserPromptMessage("哪吒2票房是多少?"))
print(result)
function类型的节点适用于一些简单的、无需复用的逻辑,适合用于快速构建简单的workflow。
继承NodeBase的节点¶
- 定义
NodeConfig - 继承
NodeBase,并实现run方法
from typing import Any, Iterator
import requests
from tongagents.workflow.nodes.simple_node_base import NodeBase, NodeConfig
from tongagents.workflow.simple_workflow import RunContext, node_declare
class APINodeConfig(NodeConfig):
url: str
method: str
headers: dict
body: dict
class APINode(NodeBase):
def __init__(self, config: NodeConfig):
print(f"init llm node, config: {config}")
if not isinstance(config, APINodeConfig):
raise ValueError("config must be a APINodeConfig")
super().__init__(config)
@node_declare()
def run(self, events: Iterator[Any], context: RunContext = None) -> Iterator[Any]:
for event in events:
# 发送请求
response = requests.request(self.config.method, self.config.url, headers=self.config.headers, data=self.config.body)
# 返回响应
yield response
workflow中复用。
NodeConfig 定义了节点运行时的配置,从而可以实现节点复用。
NodeBase#run 定义了节点运行时的逻辑。
数据处理模式¶
在TongAgents workflow框架中,数据处理模式是一个关键概念,它定义了节点如何处理来自多个上游节点的输入事件。引入数据处理模式主要基于以下考虑:
- 数据流调度机制:在默认情况下,workflow的调度遵循事件驱动模型。对于具有多个上游输入的节点,一旦任何一个输入事件到达,该节点便可能被触发执行。这种机制虽然能够提高并行处理效率,但在某些业务场景下可能无法满足需求。
-
节点处理语义差异:不同业务场景下的节点对输入数据的处理需求各不相同。例如:
- 某些节点虽然连接了多个上游数据源,但这些数据源产生的是结构相同的数据,节点只需要简单地对每个事件单独处理。
- 而另一些节点则需要汇聚所有上游数据,形成一个完整的上下文后再进行综合处理和决策。
通过配置适当的数据处理模式,开发者可以精确控制节点的输入处理行为,实现更加复杂和灵活的数据流编排。
MERGE_EVENTS_AS_EVENT_MAP¶
MERGE_EVENTS_AS_EVENT_MAP 是一个特殊的数据处理模式,它允许一个节点等待所有上游依赖节点的数据都准备好后,再进行处理。这在需要整合多个数据源的场景中特别有用。
from collections.abc import Iterator
from tongagents.base_agent import WorkflowAgent
from tongagents.workflow.nodes.simple_node_base import END, START, InputProcessMode
from tongagents.workflow.simple_workflow import node_declare
class AggregationAgent(WorkflowAgent):
def __init__(self):
super().__init__()
@node_declare(
name="weather_node",
edges=[(START, "weather_node")],
)
def weather_node(self, event) -> str:
if event == "杭州":
return "晴天"
return "多云"
@node_declare(
name="attraction_node",
edges=[(START, "attraction_node")],
)
def attraction_node(self, event) -> str:
if event == "杭州":
return "西湖"
return "天安门"
@node_declare(
name="hotel_node",
edges=[(START, "hotel_node")],
)
def hotel_node(self, event) -> str:
if event == "杭州":
return "ABC酒店"
return "DEF酒店"
@node_declare(
name="travel_advisor",
edges=[
("weather_node", "travel_advisor"),
("attraction_node", "travel_advisor"),
("hotel_node", "travel_advisor"),
("travel_advisor", END),
],
input_process_mode=InputProcessMode.MERGE_EVENTS_AS_EVENT_MAP,
)
def travel_advisor(self, event) -> Iterator[str]:
return f"今天天气{event['weather_node']},去{event['attraction_node']},住在{event['hotel_node']}"
if __name__ == "__main__":
agent = AggregationAgent()
output = [item for item in agent.stream("杭州")]
print(output)
output = [item for item in agent.stream("北京")]
print(output)
"""
['今天天气晴天,去西湖,住在ABC酒店']
['今天天气多云,去天安门,住在DEF酒店']
"""
在这个例子中,travel_advisor 节点使用了 MERGE_EVENTS_AS_EVENT_MAP 模式,它会等待天气、景点和酒店三个数据源节点的结果全部准备好后,才会执行。这确保了LLM可以基于完整的信息生成全面的旅游建议,而不是基于部分信息生成不完整的建议。
这种模式特别适用于以下场景:
- 需要整合多个异步数据源的信息
- 所有数据源的信息都是必需的,缺一不可
- 需要基于完整信息做出决策或生成内容
调试模式¶
TongAgents workflow框架提供了强大的调试功能,帮助开发者深入了解workflow的执行过程。调试模式能够透出节点执行的全生命周期状态,支持同步和异步两种执行模式,同时保持原有工作流执行逻辑不变。
启用调试模式¶
在执行workflow时,通过设置debug=True参数即可启用调试模式。调试模式下,执行方法会返回两个生成器:
- event_generator: 原始工作流执行结果
- debug_generator: 调试信息流
# 同步执行模式
resp, debug_resp = workflow.run(input_generator(), debug=True)
resp, debug_resp = workflow.stream("你好", debug=True)
# 异步执行模式
resp, debug_resp = workflow.arun(input_generator(), debug=True)
resp, debug_resp = workflow.astream("你好", debug=True)
调试事件类型¶
调试模式会产生四种类型的调试事件,涵盖节点执行的完整生命周期:
| 事件类型 | 说明 | 触发时机 |
|---|---|---|
node_start |
节点开始执行 | 节点准备开始处理输入事件时 |
node_process |
节点处理中间结果 | 节点正在处理事件时(可选) |
node_end |
节点执行完成 | 节点成功完成处理时 |
node_exception |
节点执行异常 | 节点执行过程中发生异常时 |
调试事件结构¶
每个调试事件都遵循统一的DebugEvent结构:
不同类型的调试事件包含的数据字段如下:
node_start/node_end/node_process¶
{
"node_name": "producer", # 节点名称
"in_event": {...}, # 输入事件数据
"timestamp": "2024-01-01...", # 时间戳(可选)
}
node_exception¶
{
"node_name": "consumer", # 节点名称
"error_info": "测试异常", # 异常信息
"timestamp": "2024-01-01...", # 时间戳(可选)
}
基本使用示例¶
以下是一个完整的调试模式使用示例:
from tongagents.base_agent import WorkflowAgent
from tongagents.entity import Event
from tongagents.workflow.nodes.simple_node_base import END, START
from tongagents.workflow.simple_workflow import RunContext, node_declare
class DebugWorkflowAgent(WorkflowAgent):
@node_declare(
name="producer",
edges=[(START, "producer"), ("producer", "consumer")],
)
def producer(self, reqs):
return Event(hello="你好")
@node_declare(
name="consumer",
edges=[("consumer", END)],
with_context_when_called=True,
)
def consumer(self, event, context: RunContext[dict]):
return Event(hello="你好2")
# 创建workflow实例
workflow = DebugWorkflowAgent()
# 启用调试模式执行
def input_generator():
yield "用户输入"
resp, debug_resp = workflow.run(input_generator(), debug=True)
# 处理执行结果
for result in resp:
print(f"执行结果: {result}")
# 处理调试信息
for debug_event in debug_resp:
print(f"调试事件: {debug_event.type} - {debug_event.data}")
"""
执行结果: description=None event_happen_time='2025-05-30 16:59:13' hello='你好2' data_type='Event'
调试事件: node_start - {'node_name': 'producer', 'in_event': '用户输入'}
调试事件: node_end - {'node_name': 'producer', 'in_event': Event(description=None, event_happen_time='2025-05-30 16:59:13', hello='你好', data_type='Event')}
调试事件: node_start - {'node_name': 'consumer', 'in_event': Event(description=None, event_happen_time='2025-05-30 16:59:13', hello='你好', data_type='Event')}
调试事件: node_end - {'node_name': 'consumer', 'in_event': Event(description=None, event_happen_time='2025-05-30 16:59:13', hello='你好2', data_type='Event')}
调试事件: node_start - {'node_name': '__end__', 'in_event': Event(description=None, event_happen_time='2025-05-30 16:59:13', hello='你好2', data_type='Event')}
调试事件: node_end - {'node_name': '__end__', 'in_event': Event(description=None, event_happen_time='2025-05-30 16:59:13', hello='你好2', data_type='Event')}
"""
异常调试¶
当节点执行过程中发生异常时,调试模式会捕获异常信息并生成node_exception事件:
class ExceptionWorkflowAgent(WorkflowAgent):
@node_declare(
name="error_node",
edges=[(START, "error_node"), ("error_node", END)],
)
def error_node(self, event):
raise Exception("模拟异常")
return Event(result="不会执行到这里")
workflow = ExceptionWorkflowAgent()
resp, debug_resp = workflow.stream("测试", debug=True)
try:
# 处理执行结果(会抛出异常)
for result in resp:
print(result)
except Exception as e:
print(f"捕获到异常: {e}")
# 查看调试信息中的异常事件
for debug_event in debug_resp:
if debug_event.type == "node_exception":
print(f"节点异常: {debug_event.data['node_name']} - {debug_event.data['error_info']}")
"""
捕获到异常: 模拟异常
节点异常: error_node - 模拟异常
"""
异步调试模式¶
对于异步workflow,调试模式的使用方式类似,但需要使用异步迭代:
import asyncio
class AsyncWorkflowAgent(WorkflowAgent):
@node_declare(
name="async_producer",
edges=[(START, "async_producer"), ("async_producer", END)],
)
async def async_producer(self, event):
await asyncio.sleep(0.1) # 模拟异步操作
return Event(result="异步处理完成")
async def main():
workflow = AsyncWorkflowAgent()
# 异步调试模式
resp, debug_resp = workflow.astream("测试", debug=True)
# 处理执行结果
async for result in resp:
print(f"异步结果: {result}")
# 处理调试信息
async for debug_event in debug_resp:
print(f"异步调试事件: {debug_event.type} - {debug_event.data}")
# 运行异步示例
asyncio.run(main())
"""
异步结果: description=None event_happen_time='2025-05-30 17:01:51' result='异步处理完成' data_type='Event'
异步调试事件: node_start - {'node_name': 'async_producer', 'in_event': '测试'}
异步调试事件: node_end - {'node_name': 'async_producer', 'in_event': Event(description=None, event_happen_time='2025-05-30 17:01:51', result='异步处理完成', data_type='Event')}
异步调试事件: node_start - {'node_name': '__end__', 'in_event': Event(description=None, event_happen_time='2025-05-30 17:01:51', result='异步处理完成', data_type='Event')}
异步调试事件: node_end - {'node_name': '__end__', 'in_event': Event(description=None, event_happen_time='2025-05-30 17:01:51', result='异步处理完成', data_type='Event')}
"""
Human in the Loop¶
TongAgents框架提供了优雅的Human in the Loop机制,允许AI工作流在执行过程中暂停并等待人类输入,然后继续执行。这种机制具有以下特点:
- 多次中断支持:在单个节点中可以多次请求人类输入
- 幂等性保证:支持重试机制,确保执行的一致性
- 状态管理:优雅的中断状态管理和恢复机制
- 简单易用:通过简单的
interrupt()函数调用即可实现人机交互
核心概念¶
中断机制¶
interrupt()函数用于在workflow节点执行过程中暂停执行,等待人类输入:
from tongagents.workflow.nodes.simple_node_base import interrupt
def consumer(self, event, context: RunContext[dict]):
# 第一次中断,等待人类输入
res1 = interrupt("请提供第一个参数", context, "consumer", event)
# 第二次中断,等待人类输入
res2 = interrupt("请提供第二个参数", context, "consumer", event)
return Event(result=res1 + res2)
恢复机制¶
使用ResumeCommand来响应中断并提供人类输入:
from tongagents.workflow.commands.entrance_takeover_command import ResumeCommand
# 响应中断
resp = workflow.step(ResumeCommand(human_input="用户输入的内容"))
基本使用示例¶
以下是一个完整的Human in the Loop使用示例:
from collections.abc import Iterator
from tongagents.base_agent import WorkflowAgent
from tongagents.entity import Event
from tongagents.workflow.commands.entrance_takeover_command import ResumeCommand
from tongagents.workflow.nodes.simple_node_base import END, START, interrupt
from tongagents.workflow.simple_workflow import RunContext, node_declare
class InteractiveWorkflow(WorkflowAgent):
@node_declare(
name="producer",
edges=[(START, "producer"), ("producer", "consumer")],
)
def producer(self, reqs) -> Iterator[str]:
return Event(hello="开始处理")
@node_declare(
name="consumer",
edges=[("consumer", END)],
with_context_when_called=True,
)
def consumer(self, event, context: RunContext[dict]) -> Iterator[str]:
# 第一次中断
res1 = interrupt("请输入第一个参数", context, "consumer", event)
# 第二次中断
res2 = interrupt("请输入第二个参数", context, "consumer", event)
return Event(result=f"处理结果: {res1} + {res2}")
# 创建workflow实例
workflow = InteractiveWorkflow()
# 第一次执行,触发第一个中断
resp = workflow.step("开始")
print("[1]", resp) # 输出: 请输入第一个参数
# 响应第一次中断,触发第二个中断
resp = workflow.step(ResumeCommand(human_input="参数1"))
print("[2]", resp) # 输出: 请输入第二个参数
# 响应第二次中断,完成执行
resp = workflow.step(ResumeCommand(human_input="参数2"))
print("[3]", resp) # 输出: Event(result="处理结果: 参数1 + 参数2")
多次中断的执行流程¶
当节点中包含多次interrupt()调用时,执行流程如下:
时间线 ------>
第一次执行:
interrupt_id=0 ----[中断]----> 等待人类输入
|
第二次执行(恢复): v
interrupt_id=0 ----[返回缓存]---> interrupt_id=1 ----[中断]----> 等待人类输入
|
第三次执行(恢复): v
interrupt_id=0 ----[返回缓存]---> interrupt_id=1 ----[返回缓存]---> 继续执行完成
幂等性机制¶
TongAgents的中断机制具有重要的幂等性特征:
执行恢复点¶
重要:当workflow从中断恢复时,执行是从中断节点的第一行代码开始,而不是阻塞在interrupt()调用处。这意味着:
def consumer(self, event, context: RunContext[dict]):
print("节点开始执行") # 每次恢复都会重新执行这一行
res1 = interrupt("第一次中断", context, "consumer", event)
print(f"获得第一次输入: {res1}") # 恢复后才会执行这一行
res2 = interrupt("第二次中断", context, "consumer", event)
print(f"获得第二次输入: {res2}") # 最终恢复后才会执行这一行
return Event(result=res1 + res2)
中断计数器¶
框架使用interrupt_counter来确保幂等性:
- 每次进入节点时重置计数器
- 每次调用
interrupt()时计数器递增 - 重试时通过计数器判断是否返回缓存的输入
输入事件一致性¶
框架保证多次恢复时的输入event保持一致,确保节点逻辑的可预测性。
循环中的中断¶
支持在for循环中调用interrupt(),但需要注意以下限制:
def consumer(self, event, context: RunContext[dict]):
results = []
# ✅ 推荐:循环次数固定且取决于输入event
items = event.get("items", []) # 根据输入确定循环次数
for i, item in enumerate(items):
user_input = interrupt(f"请处理项目 {item}", context, "consumer", event)
results.append(user_input)
return Event(results=results)
注意事项:
- 避免随机循环次数:循环次数应该是确定的,基于输入event计算得出
- 避免条件依赖中断结果:不要让循环的继续条件依赖于中断的返回值
# ❌ 不推荐:循环次数不确定
def bad_example(self, event, context: RunContext[dict]):
while True:
user_input = interrupt("继续吗?", context, "consumer", event)
if user_input == "停止":
break # 这会导致重试时行为不一致
高级用法示例¶
条件中断¶
def conditional_consumer(self, event, context: RunContext[dict]):
result = event.get("auto_process", False)
if not result:
# 只有在需要时才中断
user_decision = interrupt("是否继续处理?", context, "consumer", event)
if user_decision.lower() != "yes":
return Event(status="cancelled")
# 继续正常处理
return Event(status="completed")
数据验证中断¶
def validation_consumer(self, event, context: RunContext[dict]):
data = event.get("data")
# 验证数据
if not self.validate_data(data):
corrected_data = interrupt(
f"数据验证失败: {data}。请提供正确的数据",
context,
"consumer",
event
)
data = corrected_data
# 处理验证后的数据
return Event(processed_data=self.process(data))
通过Human in the Loop机制,TongAgents能够在AI自动化和人类智慧之间建立有效的桥梁,实现更加灵活和可控的工作流执行。
Command¶
Command 是workflow的扩展能力,用于支持一些超出常规pipeline的复杂能力。
Node 可以通过yield 返回Command,从而实现Node之间的特殊数据交互逻辑。
目前支持的Command包括:
TransferCommand: 将当前节点的事件转移给另一个节点EntranceTakeOverCommand: 将当前节点的事件转移给另一个节点,并接管整个流程EntranceTakeOffCommand: 将当前节点的事件转移给另一个节点,并停止当前节点的执行RestartNodeCommand: 重新执行当前节点