为云资源上报数据添加额外的 Tags¶
1. 背景¶
一般情况下,采集器从云厂商处获取资源后仅提取部分普遍重要的属性做 tags,如此对于一些用户来说还不够。本文将介绍如何为采集后(上报前)的数据补充额外的 tags。
2. 方案¶
在不修改官方采集器的前提下,采集器本身提供了 after_collect 参数,用户可赋值一个函数,对采集后的数据做二次处理,其中就包括添加额外的 tags。
def handler(point):
point['tags']['origin'] = 'shanghai'
return point
@DFF.API('xxx Collection', timeout=3600, fixed_crontab='* * * * *')
def run():
Runner(main.DataCollector(account, collector_configs, after_collect=handler), debug=True).run()
上面示例省略了无关配置,重点关注 handler 函数,该函数仅支持一个参数 point, point是采集器即将上报的数据,数据结构可参考相关采集器文档「数据上报格式」,可以肯定的是point一定包含三个字段分别是measurement、tags、fields (需要详细了解的同学,可自行查找行协议相关文档)。我们重点关注的就是point.tags字段,将待补充的健值对插入 tags 中即可,示例中相当于给 point.tags 添加一个key为origin,value为shanghai的健值对。
3. 案例¶
将 AWS 控制台配置的 EC2 tags,补充到采集器采集的 EC2 对象数据的tags中
情景一:直接从point.fields中提取Tags字段补充到point.tags
account = {
'ak_id' : DFF.ENV('aws_develop_test')['ak_id'],
'ak_secret' : DFF.ENV('aws_develop_test')['ak_secret'],
}
collector_configs = {
'regions': ['cn-northwest-1']
}
from integration_core__runner import Runner
import integration_aws_ec2__main as main
from integration_core__utils import json_loads
def add_tags(point):
# 如果 point.fields 中存在云资源的 Tags,直接取
cloud_tags = json_loads(point['fields'].get('Tags'))
if not cloud_tags:
return point
for t in cloud_tags:
t_key = t['Key']
t_v = t['Value']
# 对于已经存在的 tags 不需要被替换(区分大小写)
protected_tags = [k.lower() for k in point['tags'].keys()]
if t_key.lower() in protected_tags:
continue
# 对于一些双下划线开头结尾的 tags 谨慎补充,以下是禁止补充
if t_key.startswith('__') and t_key.endswith('__'):
continue
point['tags'][t_key] = t_v
return point
@DFF.API('AWS-EC2 Collection', timeout=3600, fixed_crontab='*/15 * * * *')
def run():
Runner(main.DataCollector(account, collector_configs, after_collect=add_tags)).run()
情景二:并非所有采集器的 point.fields 中都存在Tags字段(持续支持中。..),如果没有支持,需要从云厂商开放的 API 中获取(也可能客户自己的 API):
account = {
'ak_id' : DFF.ENV('aws_develop_test')['ak_id'],
'ak_secret' : DFF.ENV('aws_develop_test')['ak_secret'],
}
# collector configuration
collector_configs = {
'regions': ['cn-northwest-1']
}
from integration_core__runner import Runner
import integration_aws_ec2__main as main
from integration_core__utils import json_loads
from integration_core__client import AWS
def add_tags(point):
# 如果 point.fields 中不存在云资源的 Tags,可以调用云 API 获取
client = AWS(**account)
region_id = point['tags']['RegionId']
instance_id = point['tags']['InstanceId']
biz_params = {
'Filters': [
{
'Name': 'resource-id',
'Values': [
instance_id,
]
}
]
}
api_res = client.do_api(action='describe_tags', product='ec2', region_id=region_id, **biz_params)
if not api_res:
return point
cloud_tags = api_res.get('Tags')
if not cloud_tags:
return point
for t in cloud_tags:
t_key = t['Key']
t_v = t['Value']
# 对于已经存在的 tags 不需要被替换(区分大小写)
protected_tags = [k.lower() for k in point['tags'].keys()]
if t_key.lower() in protected_tags:
continue
# 对于一些双下划线开头结尾的 tags key 谨慎补充,该 demo 直接禁止补充
if t_key.startswith('__') and t_key.endswith('__'):
continue
point['tags'][t_key] = t_v
return point
@DFF.API('AWS-EC2 Collection', timeout=3600, fixed_crontab='*/15 * * * *')
def run():
Runner(main.DataCollector(account, collector_configs, after_collect=add_tags)).run()
4. 重点注意事项¶
- 云产品采集器中,自定义对象 tags 会自动补充到关联的指标 tags 中,所以如果您既开启自定义对象采集器,又开启了云监控采集器,需要补充 tags 时,只需要补充给对象采集器即可。
- 为采集器上报数据补充 tags 要特别注意,有的字段不能被覆盖,比如自定义对象的
name字段,建议像案例一样,如果原数据 tags 存在相同的 key 就不要再补充进去了,防止出现意外情况。 after_collect所赋值的函数只接收一个参数point,处理过point后函数必须返回一个/多个point,如果没有返回或者处理过程中抛错,一律按照未经函数处理的原始数据上报,当定义了after_collect函数无效时,首先排查这种可能。