观测云采集 Amazon ECS 日志¶
简介¶
Amazon Elastic Container Service (Amazon ECS) 是一项高度可扩展的快速容器管理服务,可以使用它轻松运行、停止和管理群集上的容器。这些容器可以运行在自己的 EC2 服务器上,也可以运行在由 AWS Fargate 托管的无服务器基础设施。
针对任务使用 Fargate 的启动类型,需要启动容器的 awslogs 日志驱动程序,运行在容器中的应用以 STDOUT 和 STDERR I/O 流的方式输出的日志,会被发送到 CloudWatch Logs 的日志组中,再通过 Func 采集这些日志,Func 再把日志通过 EC2 上部署的 DataKit 写入观测云中。
本文的日志采集就是针对 AWS Fargate 托管的容器。
前置条件¶
- 需要先创建一个观测云账号
- 安装 DataKit
- 安装 Func 携带版
- 已经拥有运行在 ECS 的 Java 应用
这里使用到的 ECS 集群名称是 cluster-docker,下面查看示例的日志及日志组。登录「AWS」,进入「Elastic Container Service」 - 点击「集群」 - 「cluster-docker」。
点击「服务名称」
进入任务
在详细信息标签的容器下面找到日志配置。
点击「日志标签」,里面是应用的日志,接下来采集这些日志。
操作步骤¶
Warning
示例所使用的版本为 DataKit 1.4.18
步骤 1 AWS 配置¶
1.1 用户密钥¶
使用部署 ECS 用到的账号,AWS 创建该用户时提供的 Access key ID
和 Secret access key
后面会使用到。
1.2 设置 AWS 用户权限¶
登录 AWS 的 IAM 控制台,在用户下面找到 ECS 所在的「用户」- 点击「添加权限」。
点击「直接附加现有策略」,「筛选策略」选中 CloudWatchLogsReadOnlyAccess
、CloudWatchEventsReadOnlyAccess
,然后点击「下一步:审核」。
步骤 2 Func 配置¶
2.1 配置环境变量¶
登录「Func」 - 「开发」 - 「环境变量」 - 「添加环境变量」。这里添加 3 个环境变量:
AWS_LOG_KEY
值对应步骤 1.1 中 AWS 用户的Access key ID
AWS_LOG_SECRET_ACCESS_KEY
值对应步骤 1.1中 AWS 用户的Secret access key
AWS_REGION_NAME
值对应 AWS 用户所在的REGION
。
2.2 配置连接器¶
登录「Func」 - 「开发」 - 「连接器」 - 「添加连接器」。
这里 ID 必须填 DataKit,主机对应已安装 DataKit 的地址,端口是 DataKit 的端口。(本示例直接用 IP,所以协议填 HTTP)
点击「测试连通性」,有返回,说明 DataKit 可用。
2.3 PIP 工具配置¶
登录「Func」 - 「管理」 - 「实验性功能」,右侧选中「开启 PIP 工具模块」。
点击左侧的「PIP 工具」 - 选择「阿里云镜像」 - 输入 boto3
- 点击「安装」。
2.4 脚本库¶
登录「Func」 - 「开发」 - 「脚本库」 - 「添加脚本集」,ID 可以自定义,点击「保存」。
找到「AWS 日志采集」 - 点击「添加脚本」。
输入 ID ,示例这里定义为 aws_ecs__log
,点击「保存」。
点击「编辑」。
输入如下内容:
输入内容
import boto3
import json
import time
scope_id='ecs_log'
@DFF.API('aws_ecs log', timeout=500, api_timeout=180)
def run(measurement, logGroupName, interval):
print(measurement, logGroupName, interval)
get_log_data(measurement, logGroupName, interval)
# if data is not None:
# push_log(data)
# else:
# print("None")
def get_cron_time(interval, measurement):
cache = DFF.CACHE.get('last_time_%s' %measurement,scope=scope_id)
if cache == None:
currentTime = int(round(time.time() * 1000))
startTime = currentTime - int(interval) * 1000
endTime = currentTime
else:
currentTime = int(round(time.time() * 1000))
if currentTime - int(cache) > 10 * 60 * 1000:
startTime = currentTime - int(interval) * 1000
endTime = currentTime
else:
startTime = int(cache) + 1
endTime = currentTime
print(startTime, endTime)
return startTime, endTime
def get_log_data(measurement, logGroupName, interval):
logTime = get_cron_time(interval, measurement)
startTime = logTime[0]
endTime = logTime[1]
isPush = False
client = boto3.client(
'logs',
aws_access_key_id=DFF.ENV('AWS_LOG_KEY'),
aws_secret_access_key=DFF.ENV('AWS_LOG_SECRET_ACCESS_KEY'),
region_name=DFF.ENV('AWS_REGION_NAME')
)# print(client.meta.config)
try:
nextToken = 'frist'
logData = []
while nextToken != '':
if nextToken == 'frist':
nextToken = ''
response = client.filter_log_events(
logGroupName=logGroupName,
startTime=startTime,
endTime=endTime,
limit=1000,
#filterPattern="?ERROR ?WARN ?error ?warn",
interleaved=False
)
else:
response = client.filter_log_events(
logGroupName=logGroupName,
startTime=startTime,
endTime=endTime,
nextToken=nextToken,
limit=1000,
#filterPattern="?ERROR ?WARN ?error ?warn",
interleaved=False
)
try:
if len(response['events']) > 0:
data = []
lastTimeList = []
for i in response['events']:
# print("hii", i['logStreamName'])
log = {
'measurement': measurement,
'tags': {
'logGroupName': logGroupName,
'logStreamName': i['logStreamName'],
'host': '127.0.0.1'
},
'fields': {
'message': i['message'],
'time': i['timestamp']
}
}
data.append(log)
lastTimeList.append(i['timestamp'])
push_log(data)
print("max %s" % max(lastTimeList))
DFF.CACHE.set('last_time_%s' % measurement, max(lastTimeList), scope=scope_id, expire=None)
isPush = True
else:
DFF.CACHE.set('last_time_%s' % measurement, endTime , scope=scope_id, expire=None)
nextToken = response['nextToken']
except:
nextToken = ''
except Exception as e:
print('Error: %s' % e )
return None
if not isPush:
DFF.CACHE.set('last_time_%s' % measurement, endTime , scope=scope_id, expire=None)
def push_log(data):
datakit = DFF.SRC('datakit')
status_code, result = datakit.write_logging_many(data=data)
if status_code == 200:
print("total %d" % len(data))
print(status_code, result)
Warning
- 上述内容第4行的
ecs_log
,需要确保同一个 Func 中唯一,可以改成其它字母。 - 第6行的
awc_ecs
即是刚才添加的脚本集 ID - 第 40、41、42 行中的
AWS_LOG_KEY
、AWS_LOG_SECRET_ACCESS_KEY
、AWS_REGION_NAME
对应步骤 2.1 中的环境变量名,如果环境变量名变了,需要做对应修改。
2.5 测试脚本¶
如下图选择「run」,第二个红框所示内容中:
measurement
的值输入ecs_log_source
,这个值对应观测云日志中的日志来源;logGroupName
的值对应前置条件的日志配置中查到的awslogs-group
;interval
的值对应采集频率,示例这里是 60 秒。
点击「执行」,输出total 8
,即上报八条日志。
登录「观测云」,进入「日志」模块,数据源选择「ecs_log_source」,即可看到日志。
点击右上角的「发布」
点击右上角「结束编辑」
2.6 自动采集日志¶
登录「Func」 - 「管理」 - 「自动触发配置」 - 「新建」,参数输入刚才执行的内容。
时间选择每分钟或者每 5 分钟,点击「保存」
在「自动触发配置」列表中存在「aws_ecs log」的记录
点击「近期执行」查看执行情况