跳转至

监控报警智能体示例

TongAgents 框架可以实现事件处理的并行化,基于此可以实现各种实时处理系统。借助并行化机制,可以实现对分级事件的实时处理。 以下的例子展示了利用TongAgents来构建一个实时的报警处理系统。 我们模拟环境产生2种类型的事件,一种是紧急事件,需要快速处理,一种是普通报警,处理起来时长比较长,无需实时快速处理。通过我们的TongAgents框架实现的预警处理Agent,具有实时处理紧急事件的能力,而不会因为其他普通事件的处理导致无法实时响应紧急事件。

环境准备

  • 安装 TongAgents SDK,参见安装指南
  • 配置模型服务,参考模型配置
  • 确保 Python 环境和必要依赖已安装

代码示例

下面示例代码演示了如何构建一个实时监控系统:

  1. 利用 MonitorEnv 生成紧急和常规监控事件;
  2. 使用 FastSlowAgent 通过意图分类和路由节点将事件分发到:

  3. 快速处理系统(紧急告警);

  4. 慢速处理系统(常规数据详细分析)。
examples/fast_slow_react/fast_slow_agent.py
import time
from collections.abc import Iterator

from tongagents.agents.planner_agent import TongPlanner
from tongagents.base_agent import WorkflowAgent
from tongagents.entity import Action, Event
from tongagents.workflow.commands.transfer_command import TransferCommand
from tongagents.workflow.nodes.simple_node_base import InputProcessMode, RunMode
from tongagents.workflow.simple_workflow import END, START, node_declare


class MonitorEnv:
    def event_generator(self) -> Iterator[Event]:
        """Generate both emergency and regular events"""

        yield {
            "type": "regular",
            "system": "车间系统",
            "metric": "cpu",
            "value": "80%",
            "timestamp": time.time(),
        }
        time.sleep(2)
        yield {
            "type": "emergency",
            "alert_type": "设备",
            "severity": "高",
            "timestamp": time.time(),
        }
        time.sleep(10)

    def action_process(self, action: Action):
        resp = action.model_dump()
        print(
            f"{resp.get('event_happen_time')} 系统响应:"
            f"{resp.get('message')} "
            f"{'' if resp.get('analysis') is None else ' ' + resp.get('analysis')}"
        )


class FastSlowAgent(WorkflowAgent):
    planner: TongPlanner = None

    def __init__(self, planner):
        super().__init__()
        self.planner = planner

    @node_declare(
        name="event_classifier",
        edges=[(START, "event_classifier")],
        run_mode=RunMode.PARALLEL,
        input_process_mode=InputProcessMode.MERGE_GENERATORS_AS_EVENT_SEQUENCE,
    )
    def classify_event(self, event: dict) -> dict:
        """Classify events into emergency or regular"""
        is_emergency = event.get("type") == "emergency" and event.get(
            "severity", "低"
        ) in ["高", "中"]
        return {"event": event, "type": "emergency" if is_emergency else "regular"}

    @node_declare(
        name="router",
        edges=[("event_classifier", "router")],
    )
    def route_event(self, data: dict) -> Event:
        """Route events to appropriate system"""
        if data["type"] == "emergency":
            return TransferCommand(
                transfer_event=data["event"], transfer_target="fast_system"
            )
        return TransferCommand(
            transfer_event=data["event"], transfer_target="slow_system"
        )

    @node_declare(
        name="fast_system",
        edges=[("router", "fast_system"), ("fast_system", END)],
    )
    def fast_system(self, event: dict) -> Action:
        """Handle emergency events immediately"""
        alert_type = event.get("alert_type", "未知")
        severity = event.get("severity", "高")
        response = {
            "message": f"【紧急事件实时处理】接收到 - {severity}{alert_type}告警!",
            "timestamp": time.time(),
            "response_type": "emergency",
        }
        return Action(**response)

    @node_declare(
        name="slow_system",
        edges=[("router", "slow_system"), ("slow_system", END)],
        run_mode=RunMode.PARALLEL_ITERATION,
    )
    def slow_system(self, event: dict) -> Iterator[Action]:
        """Handle regular events with detailed analysis"""
        # Simulate time-consuming analysis
        system = event.get("system", "未知")
        metric = event.get("metric", "未知")
        value = event.get("value", "未知")
        yield Action(
            **{
                "message": f"【普通事件】开始分析 {system} 系统的 {metric} 指标...",
                "timestamp": time.time(),
            }
        )
        time.sleep(5)

        analysis_result = self.planner.plan_once(Event(**event))
        yield Action(
            **{
                "message": f"【普通事件】分析完成: {system}{metric} 值为 {value}",
                "analysis": analysis_result.resp,
                "timestamp": time.time(),
            }
        )

    def run_with_env(self, env):
        resp = self.stream(env.event_generator())
        for action in resp:
            env.action_process(action)


if __name__ == "__main__":
    # Create planner with specific role
    system_planner = TongPlanner(
        """
    你是一个智能工厂的系统分析师,负责:
    1. 分析系统运行数据
    2. 提供优化建议
    3. 预测潜在问题

    重点关注指标:
    - CPU使用率 (警戒值: 85%)
    - 内存使用率 (警戒值: 90%)
    - 磁盘使用率 (警戒值: 95%)
    - 网络延迟 (警戒值: 100ms)
    - 错误率 (警戒值: 1%)

    分析时需要考虑:
    1. 历史趋势
    2. 负载pattern
    3. 资源使用效率
    4. 潜在风险
    """
    )

    # Create and run agent
    agent = FastSlowAgent(system_planner)
    env = MonitorEnv()
    print("智能工厂监控系统启动")
    print("支持紧急告警和常规监控数据分析")
    print("-" * 50)
    agent.run_with_env(env)

构建的流程:

STARTevent_classifier(事件分类)router(路由)fast_system(紧急处理系统)slow_system(普通处理系统)END输入事件分类事件路由紧急事件处理系统普通事件处理系统紧急事件处理结果输出普通事件处理多次输出
STARTevent_classifier(事件分类)router(路由)fast_system(紧急处理系统)slow_system(普通处理系统)END输入事件分类事件路由紧急事件处理系统普通事件处理系统紧急事件处理结果输出普通事件处理多次输出

输出结果如下:

智能工厂监控系统启动 支持紧急告警和常规监控数据分析

  • 2025-03-14 12:00:50 系统响应:【普通事件】开始分析 车间系统 系统的 cpu 指标...
  • 2025-03-14 12:00:52 系统响应:【紧急事件实时处理】接收到 - 高级设备告警!
  • 2025-03-14 12:01:02 系统响应:【普通事件】分析完成: 车间系统 的 cpu 值为 80% 根据所提供的环境事件信息,当前的CPU使用率为80%,低于设定的警戒值85%。这表明当前系统的CPU负载尚处于安全范围内。然而,为了提供全面的分析和优化建议,我们需要结合更多方面的数据进行综合考量,以下是需要重点关注和分析的几个方面:

  • 历史趋势:需要分析CPU使用率的历史数据,观察其是否存在上升趋势,特别是接近警戒值的情况。如果存在明显上升趋势,那么未来可能会触及或超出警戒值。

  • 负载Pattern:根据车间系统的特性分析一天内或一周内CPU使用率的变化模式。是否存在特定时间段内CPU使用显著增加?此外,考虑到特殊工作负荷如大规模生产启动或结束时对CPU使用的影响。

  • 资源使用效率:考虑CPU使用是否高效,即高CPU使用率是否伴随着系统性能的显著提升。如发现效率低下的使用模式,则需进一步优化应用或硬件配置。

  • 潜在风险预判:基于当前和历史数据,预测在预定生产任务或其他可能增加系统负载的情况下,CPU是否有可能超出警戒值,从而可能导致系统性能下降或其他负面影响。

综上,虽然当前CPU使用率80%没有超出警戒值,但为保持系统高效运行并预防未来可能的问题,建议定期监控CPU及其他关键资源的使用情况,并依据历史数据和预测分析调整系统以优化资源使用效率。

说明

  • 示例展示了如何利用 TongAgents 架构实时采集告警事件,并基于事件类型自动分发到各个处理系统。
  • 快速响应模块确保紧急告警即时处理,常规模块则经过分步延时分析,体现了框架在实时监控与深度分析场景下的优秀调度能力。

通过本示例,你可以快速掌握分级告警调度工作流的构建方法,充分体验 TongAgents 框架在复杂事件处理中的优势。