跳转至

Arbiter


Arbiter 是 SIEM 的数据分析引擎。

创建并编辑 SIEM 检测规则时,Arbiter 通过执行编写的脚本来对数据进行处理,并生成事件。Arbiter 提供一系列的内置函数,对于脚本的输入输出相关的函数有:dql 函数,用于从观测云查询数据、trigger 函数用处触发事件、printf 函数将信息输出到标准输出等函数。

快速开始

以统计今天较昨天新增访问 IP 为例,脚本如下:

使用 DQL 语句 R::`resource`:(distinct(`ip`) as ip) [2d:1d] 查询一天前用户访问的去重 IP 数据。

v = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
# 将结果输出到 stdout
printf("%v", v)

执行脚本的结果为:

{"series":[[{"columns":{"ip":"120.20.000.79","time":1747041737200},"tags":null},{"columns":{"ip":"120.130.000.85","time":1747031791143},"tags":null},{"columns":{"ip":"153.30.000.2","time":1747030318384},"tags":null}]],"status_code":200}

在未使用 by 语句进行分组时,DQL 查询结果的 series 列表通常只有一个元素(即仅包含一条时间线)。

我们需要对原始结果进行处理,以获取 IP 列表,具体可通过以下两种方式实现:

该函数获取所有的时间线,返回一个二维列表;如果字段不存在,则使用 nil 占位:

  • 脚本:
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
ips = dql_series_get(result_dql, "ip")

printf("%v", ips)
  • 标准输出:
["120.20.000.79","120.130.000.85","153.30.000.2"]

  • 脚本:
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
ips = []
for series in result_dql["series"] {
    for elem in series {
        # 已知在 columns 中有 ip 字段,而不是 tags 中
        if "columns" in elem && "ip" in elem["columns"] {
            ips = append(ips, elem["columns"]["ip"])
        } else {
            # 在函数 dql_series_get,对于没有该字段的,会添加 nil 占位
            ips = append(ips, nil)
        }
    }
}
printf("%v", ips)
  • 标准输出:
["120.250.000.179","120.130.000.185","153.30.000.42"]

完成 IP 列表获取后,即可开始对比今日数据与昨日 IP 列表数据。脚本参考如下:

# 昨天的
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")

ip_yesterday = dql_series_get(result_dql, "ip")

# 今天的
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [1d]")
ip_today = dql_series_get(result_dql, "ip")


# 建议判断 len(ip_today) > 0 与否,如果下标越界,运行时会报错
new_ips = []
for s in ip_today[0] {
    if s == nil {
        continue
    }
    if !(s in ip_yesterday[0]) {
        new_ips = append(new_ips, s)
    }
}

# 触发新增 IP 检测事件
trigger(
    result=new_ips,
    status="info",
    dimension_tags={
        "user_cron_job_": "new_ips_check",
        "data_category": "rum"
    },
    related_data={
        "IPs": new_ips
    }
)

trigger 函数支持多次触发,由于只执行了一次,当前结果中只有一个元素,其结果可在网页上作为模板变量使用,如 {{Result}} 对应下面的 result 等:

[
  {
    "result": [
        "116.60.000.211",
        "14.150.000.99",
        "58.240.000.245"
    ],
    "status": "info",
    "dimension_tags": {
      "data_category": "rum",
      "user_cron_job_": "new_ips_check"
    },
    "related_data": {
      "IPs": [
        "116.60.000.211",
        "14.150.000.99",
        "58.240.000.245"
      ]
    }
  }
]

场景示例

异常 log 识别地理位置变更

# 查询时间范围会自动设置,定时触发时
user_id_data = dql("L::access_log:(distict(user_id) as user_id)")

user_ids = dql_series_get(user_id_data, "user_id")
# user_ids 值如 [["user_id_1", "user_id_2", "user_id_3"]]

user_ids = user_ids[0]

for uid in user_ids {
    # 查询用户访问国家信息,去重
    country_data = dql(strfmt(
        "L::access_log:(distict(country) as country) {user_id = \"%s\"}", uid)) 

    country_list = dql_series_get(country_data, "country")

    # 超过一个国家
    if len(country_list) == 1 && len(country_list[0]) > 1 {
        trigger( # 触发一次事件
            result=uid,
            related_data = country_list[0]
        )
    }
}

检测 URL 访问顺序

# 获取检测周期内的所有 session id
session_id_data = dql("R::view:(distinct(session_id) as session_id)")

session_id_list = dql_series_get(session_id_data, "session_id")
session_id_list = session_id_list[0]

for session_id in session_id_list {
    # 获取所有 view path, 对于指定的 session_id 
    view_data = dql(strfmt("R::view:(view_path) {session_id = \"%s\"} order by time asc", session_id))

    view_path = dql_series_get(view_data, "view_path")
    view_path = view_path[0]

    pos_login_api = -1
    pos_send_msg_api = -1

    for i = 0; i < len(view_path); i++ {
        if pos_login_api < 0 && view_path[i] == "/login" {
            pos_login_api = i
        }
        if pos_send_msg_api < 0 && view_path[i] == "/send_message" {
            pos_send_msg_api = i
        }

        if pos_login_api == -1 && pos_send_msg_api >= 0 {
            trigger(
                result= strfmt("在这个 session(%s) 中,login 未访问就访问了 send_msg", session_id)
            )
        }

        # 由于是正序,第一个元素是最早的访问时间
        if pos_login_api > pos_send_msg_api {
            # 触发事件
            trigger(
                result= strfmt("在这个 session(%s) 中,login 后于 send_msg 访问", session_id)
            )
        }
    }
}

某个 Token 被多个 IP 访问

token_data = dql("L::access_log:(distict(token) as token)")

token_list = dql_series_get(token_data, "token")
token_list = token_list[0]

for token in token_list {
    token_ip_data = dql(strfmt(
        "L::access_log:(distinc(ip) as ip) {token = \"%s\"}", token)
    ip_list = dql_series_get(token_ip_data, "ip")

    if len(ip_list) == 0 && len(ip_list[0]) > 1  {
        # 触发事件
        trigger(
            result = strfmt("token %s 被多个 ip 访问, IPs: %s", token, str_join(ip_list[0], ", "))
        )
    }
}

文档评价

文档内容是否对您有帮助? ×