监控报警智能体示例¶
TongAgents 框架可以实现事件处理的并行化,基于此可以实现各种实时处理系统。借助并行化机制,可以实现对分级事件的实时处理。 以下的例子展示了利用TongAgents来构建一个实时的报警处理系统。 我们模拟环境产生2种类型的事件,一种是紧急事件,需要快速处理,一种是普通报警,处理起来时长比较长,无需实时快速处理。通过我们的TongAgents框架实现的预警处理Agent,具有实时处理紧急事件的能力,而不会因为其他普通事件的处理导致无法实时响应紧急事件。
环境准备¶
代码示例¶
下面示例代码演示了如何构建一个实时监控系统:
- 利用
MonitorEnv生成紧急和常规监控事件; -
使用
FastSlowAgent通过意图分类和路由节点将事件分发到: -
快速处理系统(紧急告警);
- 慢速处理系统(常规数据详细分析)。
import time
from collections.abc import Iterator
from tongagents.agents.planner_agent import TongPlanner
from tongagents.base_agent import WorkflowAgent
from tongagents.entity import Action, Event
from tongagents.workflow.commands.transfer_command import TransferCommand
from tongagents.workflow.nodes.simple_node_base import InputProcessMode, RunMode
from tongagents.workflow.simple_workflow import END, START, node_declare
class MonitorEnv:
def event_generator(self) -> Iterator[Event]:
"""Generate both emergency and regular events"""
yield {
"type": "regular",
"system": "车间系统",
"metric": "cpu",
"value": "80%",
"timestamp": time.time(),
}
time.sleep(2)
yield {
"type": "emergency",
"alert_type": "设备",
"severity": "高",
"timestamp": time.time(),
}
time.sleep(10)
def action_process(self, action: Action):
resp = action.model_dump()
print(
f"{resp.get('event_happen_time')} 系统响应:"
f"{resp.get('message')} "
f"{'' if resp.get('analysis') is None else ' ' + resp.get('analysis')}"
)
class FastSlowAgent(WorkflowAgent):
planner: TongPlanner = None
def __init__(self, planner):
super().__init__()
self.planner = planner
@node_declare(
name="event_classifier",
edges=[(START, "event_classifier")],
run_mode=RunMode.PARALLEL,
input_process_mode=InputProcessMode.MERGE_GENERATORS_AS_EVENT_SEQUENCE,
)
def classify_event(self, event: dict) -> dict:
"""Classify events into emergency or regular"""
is_emergency = event.get("type") == "emergency" and event.get(
"severity", "低"
) in ["高", "中"]
return {"event": event, "type": "emergency" if is_emergency else "regular"}
@node_declare(
name="router",
edges=[("event_classifier", "router")],
)
def route_event(self, data: dict) -> Event:
"""Route events to appropriate system"""
if data["type"] == "emergency":
return TransferCommand(
transfer_event=data["event"], transfer_target="fast_system"
)
return TransferCommand(
transfer_event=data["event"], transfer_target="slow_system"
)
@node_declare(
name="fast_system",
edges=[("router", "fast_system"), ("fast_system", END)],
)
def fast_system(self, event: dict) -> Action:
"""Handle emergency events immediately"""
alert_type = event.get("alert_type", "未知")
severity = event.get("severity", "高")
response = {
"message": f"【紧急事件实时处理】接收到 - {severity}级{alert_type}告警!",
"timestamp": time.time(),
"response_type": "emergency",
}
return Action(**response)
@node_declare(
name="slow_system",
edges=[("router", "slow_system"), ("slow_system", END)],
run_mode=RunMode.PARALLEL_ITERATION,
)
def slow_system(self, event: dict) -> Iterator[Action]:
"""Handle regular events with detailed analysis"""
# Simulate time-consuming analysis
system = event.get("system", "未知")
metric = event.get("metric", "未知")
value = event.get("value", "未知")
yield Action(
**{
"message": f"【普通事件】开始分析 {system} 系统的 {metric} 指标...",
"timestamp": time.time(),
}
)
time.sleep(5)
analysis_result = self.planner.plan_once(Event(**event))
yield Action(
**{
"message": f"【普通事件】分析完成: {system} 的 {metric} 值为 {value}",
"analysis": analysis_result.resp,
"timestamp": time.time(),
}
)
def run_with_env(self, env):
resp = self.stream(env.event_generator())
for action in resp:
env.action_process(action)
if __name__ == "__main__":
# Create planner with specific role
system_planner = TongPlanner(
"""
你是一个智能工厂的系统分析师,负责:
1. 分析系统运行数据
2. 提供优化建议
3. 预测潜在问题
重点关注指标:
- CPU使用率 (警戒值: 85%)
- 内存使用率 (警戒值: 90%)
- 磁盘使用率 (警戒值: 95%)
- 网络延迟 (警戒值: 100ms)
- 错误率 (警戒值: 1%)
分析时需要考虑:
1. 历史趋势
2. 负载pattern
3. 资源使用效率
4. 潜在风险
"""
)
# Create and run agent
agent = FastSlowAgent(system_planner)
env = MonitorEnv()
print("智能工厂监控系统启动")
print("支持紧急告警和常规监控数据分析")
print("-" * 50)
agent.run_with_env(env)
构建的流程:¶
输出结果如下:¶
智能工厂监控系统启动 支持紧急告警和常规监控数据分析
- 2025-03-14 12:00:50 系统响应:【普通事件】开始分析 车间系统 系统的 cpu 指标...
- 2025-03-14 12:00:52 系统响应:【紧急事件实时处理】接收到 - 高级设备告警!
-
2025-03-14 12:01:02 系统响应:【普通事件】分析完成: 车间系统 的 cpu 值为 80% 根据所提供的环境事件信息,当前的CPU使用率为80%,低于设定的警戒值85%。这表明当前系统的CPU负载尚处于安全范围内。然而,为了提供全面的分析和优化建议,我们需要结合更多方面的数据进行综合考量,以下是需要重点关注和分析的几个方面:
-
历史趋势:需要分析CPU使用率的历史数据,观察其是否存在上升趋势,特别是接近警戒值的情况。如果存在明显上升趋势,那么未来可能会触及或超出警戒值。
-
负载Pattern:根据车间系统的特性分析一天内或一周内CPU使用率的变化模式。是否存在特定时间段内CPU使用显著增加?此外,考虑到特殊工作负荷如大规模生产启动或结束时对CPU使用的影响。
-
资源使用效率:考虑CPU使用是否高效,即高CPU使用率是否伴随着系统性能的显著提升。如发现效率低下的使用模式,则需进一步优化应用或硬件配置。
-
潜在风险预判:基于当前和历史数据,预测在预定生产任务或其他可能增加系统负载的情况下,CPU是否有可能超出警戒值,从而可能导致系统性能下降或其他负面影响。
综上,虽然当前CPU使用率80%没有超出警戒值,但为保持系统高效运行并预防未来可能的问题,建议定期监控CPU及其他关键资源的使用情况,并依据历史数据和预测分析调整系统以优化资源使用效率。
说明¶
- 示例展示了如何利用 TongAgents 架构实时采集告警事件,并基于事件类型自动分发到各个处理系统。
- 快速响应模块确保紧急告警即时处理,常规模块则经过分步延时分析,体现了框架在实时监控与深度分析场景下的优秀调度能力。
通过本示例,你可以快速掌握分级告警调度工作流的构建方法,充分体验 TongAgents 框架在复杂事件处理中的优势。