Reference Table¶
通过 Reference Table 功能,Pipeline 支持导入外部数据进行数据处理。
Attention
该功能内存消耗较高,参考 150 万行磁盘占用约 200MB (JSON 文件) 的不重复数据 (string 类型两列;int, float, bool 各一列) 为例,其内存占用维持在 950MB ~ 1.2GB, 更新时的峰值内存 2.2GB ~ 2.7GB。可以通过配置 use_sqlite = true
,将数据保存到磁盘上。
表结构与列的数据类型¶
表结构为一个二维表,表与表之间通过表名区分,需要至少存在一列,各列内的元素的数据类型必须一致,且数据类型需为 int(int64), float(float64), string, bool 之一。
暂未支持给表设置主键,但是可以通过任意列进行查询,并将查到的所有结果中的第一行作为查询结果。以下为一个表结构示例:
-
表名:
refer_table_abc
-
列名(col1, col2, ...)、列数据类型(int, float, ...)、行数据:
col1: int | col2: float | col3: string | col4: bool |
---|---|---|---|
1 | 1.1 | "abc" | true |
2 | 3 | "def" | false |
从外部导入数据¶
在配置文件 datakit.conf
中配置 reference table url 与拉取间隔(默认间隔为 5 分钟)
Attention
目前要求 refer_table_url 指定的地址,其 HTTP 返回的 Content-Type 必须为 Content-Type: application/json
。
- 数据由多个 table 构成列表,每个表由一个 map 构成,map 中的字段为:
字段名 | table_name | column_name | column_type | row_data |
---|---|---|---|---|
描述 | 表名 | 所有列名 | 列数据类型,需要与列名对应,值范围 "int", "float", "string", "bool" | 多个行数据,对于 int, float, bool 类型可以使用对应类型数据或转换成字符串表示;[]any 中元素需与列名以及列类型一一对应 |
数据类型 | string | [ ]string | [ ]string | [ ][ ]any |
- JSON 结构:
[
{
"table_name": string,
"column_name": []string{},
"column_type": []string{},
"row_data": [
[]any{},
...
]
},
...
]
- 示例:
[
{
"table_name": "table_abc",
"column_name": ["col", "col2", "col3", "col4"],
"column_type": ["string", "float", "int", "bool"],
"row_data": [
["a", 123, "123", "false"],
["ab", "1234.", "123", true],
["ab", "1234.", "1235", "false"]
]
},
{
"table_name": "table_ijk",
"column_name": ["name", "id"],
"column_type": ["string", "string"],
"row_data": [
["a", "12"],
["a", "123"],
["ab", "1234"]
]
}
]
使用 SQLite 保存导入数据¶
要将导入的数据保存到 SQLite 数据库中时,只需配置 use_sqlite
为 true
:
[pipeline]
refer_table_url = "http[s]://host:port/path/to/resource"
refer_table_pull_interval = "5m"
use_sqlite = true
sqlite_mem_mode = false
当使用 SQLite 保存数据,且上述 sqlite_mem_mode
设置为 true
时,将使用 SQLite 的内存模式;默认为 SQLite 磁盘模式。
Attention
目前 windows-386 下不支持此功能。
实践示例¶
将上面的 JSON 文本写成文件 test.json
,在 Ubuntu18.04+ 使用 apt
安装 NGINX 后将文件放置于 /var/www/html 下
执行 curl -v localhost/test.json
测试文件是否能通过 HTTP GET 获取到,输出结果大致为
...
< Content-Type: application/json
< Content-Length: 522
< Last-Modified: Tue, 16 Aug 2022 06:20:52 GMT
< Connection: keep-alive
< ETag: "62fb3744-20a"
< Accept-Ranges: bytes
<
[
{
"table_name": "table_abc",
"column_name": ["col", "col2", "col3", "col4"],
"column_type": ["string", "float", "int", "bool"],
"row_data": [
...
在配置文件 datakit.conf
修改 refer_table_url 的值为:
[pipeline]
refer_table_url = "http://localhost/test.json"
refer_table_pull_interval = "5m"
use_sqlite = false
sqlite_mem_mode = false
进入 Datakit pipeline/loggging 目录,并创建测试脚本 refer_table_for_test.p
,并写入以下内容
# 从输入中提取 表名,列名,列值
json(_, table)
json(_, key)
json(_, value)
# 查询并追加当前列的数据,默认作为 field 添加到数据中
query_refer_table(table, key, value)
cd /usr/local/datakit/pipeline/logging
vim refer_table_for_test.p
datakit pipeline -P refer_table_for_test.p -T '{"table": "table_abc", "key": "col2", "value": 1234.0}' --date
由以下输出结果可知,表中列的 col, col2, col3, col4 成功被追加到输出的结果中:
2022-08-16T15:02:14.150+0800 DEBUG refer-table refertable/cli.go:26 performing request[method GET url http://localhost/test.json]
{
"col": "ab",
"col2": 1234,
"col3": 123,
"col4": true,
"key": "col2",
"message": "{\"table\": \"table_abc\", \"key\": \"col2\", \"value\": 1234.0}",
"status": "unknown",
"table": "table_abc",
"time": "2022-08-16T15:02:14.158452592+08:00",
"value": 1234
}