跳转至

Arbiter


Arbiter 是 SIEM(Security Information and Event Management) 的数据分析引擎,通过 Arbiter 可以对日志、链路、指标等类别的数据进行分析,并生成事件。

Arbiter 通过执行编写的脚本来对数据进行处理,并生成事件。Arbiter 提供一系列的内置函数,对于脚本的输入输出相关的函数有:dql 函数,用于从观测云查询数据、trigger 函数用处触发事件、printf 函数将信息输出到标准输出等函数。

快速开始

第一个脚本

以统计今天较昨天新增访问 IP 为例,脚本如下:

使用 DQL 语句 R::`resource`:(distinct(`ip`) as ip) [2d:1d] 查询一天前用户访问的去重 IP 数据。

v = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
# 将结果输出到 stdout
printf("%v", v)

执行脚本的结果为:

{"series":[[{"columns":{"ip":"120.20.000.79","time":1747041737200},"tags":null},{"columns":{"ip":"120.130.000.85","time":1747031791143},"tags":null},{"columns":{"ip":"153.30.000.2","time":1747030318384},"tags":null}]],"status_code":200}

在未使用 by 语句进行分组时,DQL 查询结果的 series 列表通常只有一个元素(即仅包含一条时间线)。

我们需要对原始结果进行处理,以获取 IP 列表,具体可通过以下两种方式实现:

  1. 通过 dql_series_get 函数:

    该函数获取所有的时间线,返回一个二维列表;如果字段不存在,则使用 nil 占位:

    • 脚本:
    result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
    ips = dql_series_get(result_dql, "ip")
    
    printf("%v", ips)
    
    • 标准输出:
    [["120.20.000.79","120.130.000.85","153.30.000.2"]]
    
  2. 通过 for 循环遍历:

    • 脚本:
    result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
    ips = []
    for series in result_dql["series"] {
        for elem in series {
            # 已知在 columns 中有 ip 字段,而不是 tags 中
            if "columns" in elem && "ip" in elem["columns"] {
                ips = append(ips, elem["columns"]["ip"])
            } else {
                # 在函数 dql_series_get,对于没有该字段的,会添加 nil 占位
                ips = append(ips, nil)
            }
        }
    }
    printf("%v", ips)
    
    • 标准输出:
    ["120.250.000.179","120.130.000.185","153.30.000.42"]
    

完成 IP 列表获取后,即可开始对比今日数据与昨日IP列表数据。脚本参考如下:

# 昨天的
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")

ip_yesterday = dql_series_get(result_dql, "ip")

# 今天的
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [1d]")
ip_today = dql_series_get(result_dql, "ip")


# 建议判断 len(ip_today) > 0 与否,如果下标越界,运行时会报错
new_ips = []
for s in ip_today[0] {
    if s == nil {
        continue
    }
    if !(s in ip_yesterday[0]) {
        new_ips = append(new_ips, s)
    }
}

# 触发新增 IP 检测事件
trigger(
    result=new_ips,
    status="info",
    dimension_tags={
        "user_cron_job_": "new_ips_check",
        "data_category": "rum"
    },
    related_data={
        "IPs": new_ips
    }
)

trigger 函数函数支持多次触发,由于只执行了一次,当前结果中只有一个元素,其结果可在网页上作为模板变量使用,如 {{Result}} 对应下面的 result 等:

[
  {
    "result": [
        "116.60.000.211",
        "14.150.000.99",
        "58.240.000.245"
    ],
    "status": "info",
    "dimension_tags": {
      "data_category": "rum",
      "user_cron_job_": "new_ips_check"
    },
    "related_data": {
      "IPs": [
        "116.60.000.211",
        "14.150.000.99",
        "58.240.000.245"
      ]
    }
  }
]

文档评价

文档内容是否对您有帮助? ×