Arbiter¶
Arbiter 是 SIEM(Security Information and Event Management) 的数据分析引擎,通过 Arbiter 可以对日志、链路、指标等类别的数据进行分析,并生成事件。
Arbiter 通过执行编写的脚本来对数据进行处理,并生成事件。Arbiter 提供一系列的内置函数,对于脚本的输入输出相关的函数有:dql
函数,用于从观测云查询数据、trigger
函数用处触发事件、printf
函数将信息输出到标准输出等函数。
快速开始¶
第一个脚本¶
以统计今天较昨天新增访问 IP 为例,脚本如下:
使用 DQL 语句 R::`resource`:(distinct(`ip`) as ip) [2d:1d]
查询一天前用户访问的去重 IP 数据。
执行脚本的结果为:
{"series":[[{"columns":{"ip":"120.20.000.79","time":1747041737200},"tags":null},{"columns":{"ip":"120.130.000.85","time":1747031791143},"tags":null},{"columns":{"ip":"153.30.000.2","time":1747030318384},"tags":null}]],"status_code":200}
在未使用 by 语句进行分组时,DQL 查询结果的 series
列表通常只有一个元素(即仅包含一条时间线)。
我们需要对原始结果进行处理,以获取 IP 列表,具体可通过以下两种方式实现:
-
通过
dql_series_get
函数:该函数获取所有的时间线,返回一个二维列表;如果字段不存在,则使用
nil
占位:- 脚本:
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]") ips = dql_series_get(result_dql, "ip") printf("%v", ips)
- 标准输出:
-
通过
for
循环遍历:- 脚本:
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]") ips = [] for series in result_dql["series"] { for elem in series { # 已知在 columns 中有 ip 字段,而不是 tags 中 if "columns" in elem && "ip" in elem["columns"] { ips = append(ips, elem["columns"]["ip"]) } else { # 在函数 dql_series_get,对于没有该字段的,会添加 nil 占位 ips = append(ips, nil) } } } printf("%v", ips)
- 标准输出:
完成 IP 列表获取后,即可开始对比今日数据与昨日IP列表数据。脚本参考如下:
# 昨天的
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
ip_yesterday = dql_series_get(result_dql, "ip")
# 今天的
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [1d]")
ip_today = dql_series_get(result_dql, "ip")
# 建议判断 len(ip_today) > 0 与否,如果下标越界,运行时会报错
new_ips = []
for s in ip_today[0] {
if s == nil {
continue
}
if !(s in ip_yesterday[0]) {
new_ips = append(new_ips, s)
}
}
# 触发新增 IP 检测事件
trigger(
result=new_ips,
status="info",
dimension_tags={
"user_cron_job_": "new_ips_check",
"data_category": "rum"
},
related_data={
"IPs": new_ips
}
)
trigger
函数函数支持多次触发,由于只执行了一次,当前结果中只有一个元素,其结果可在网页上作为模板变量使用,如 {{Result}}
对应下面的 result
等: