Develop Custom Collectors with Python¶
PythonD is a complete solution for periodically triggering user-defined Python collection scripts.
Configuration¶
Navigate to the conf.d/pythond directory under the DataKit installation directory, copy pythond.conf.sample and rename it to pythond.conf. An example is as follows:
[[inputs.pythond]]
# Python input name
name = 'some-python-inputs' # required
# System environments to run Python
#envs = ['LD_LIBRARY_PATH=/path/to/lib:$LD_LIBRARY_PATH',]
# Python path (recommended abstract Python path)
cmd = "python3" # required. python3 is recommended.
# Python scripts relative path
dirs = []
Python Environment¶
Currently in alpha phase, only compatible with Python 3+. Tested versions:
- 3.10.1
The following dependencies need to be installed:
- requests
Installation method is as follows:
The above installation requires pip. If you don't have it, you can refer to the following method (source):
Writing User-Defined Scripts¶
Create a directory named after the "Python package name" under the datakit/python.d
directory, then create Python scripts (*.py
) within this directory.
For example, if the package name is Demo
, its directory structure would look like this. Here, demo.py
is the Python script, and the filename of the Python script can be customized:
The Python script needs to inherit from the DataKitFramework
class and override the run
method.
The source code file path for the
DataKitFramework
class isdatakit_framework.py
located atdatakit/python.d/core/datakit_framework.py
.
Python Script Source Code Example
#encoding: utf-8
from datakit_framework import DataKitFramework
class Demo(DataKitFramework):
name = 'Demo'
interval = 10 # triggered interval seconds.
# if your datakit ip is 127.0.0.1 and port is 9529, you won't need use this,
# just comment it.
# def __init__(self, **kwargs):
# super().__init__(ip = '127.0.0.1', port = 9529)
# General report example.
def run(self):
print("Demo")
data = [
{
"measurement": "abc",
"tags": {
"t1": "b",
"t2": "d"
},
"fields": {
"f1": 123,
"f2": 3.4,
"f3": "strval"
},
# "time": 1624550216 # you don't need this
},
{
"measurement": "def",
"tags": {
"t1": "b",
"t2": "d"
},
"fields": {
"f1": 123,
"f2": 3.4,
"f3": "strval"
},
# "time": 1624550216 # you don't need this
}
]
in_data = {
'M':data, # 'M' for metrics, 'L' for logging, 'R' for rum, 'O' for object, 'CO' for custom object, 'E' for event.
'input': "datakitpy"
}
return self.report(in_data) # you must call self.report here
# # KeyEvent report example.
# def run(self):
# print("Demo")
# tags = {"tag1": "val1", "tag2": "val2"}
# date_range = 10
# status = 'info'
# event_id = 'event_id'
# title = 'title'
# message = 'message'
# kwargs = {"custom_key1":"custom_value1", "custom_key2": "custom_value2", "custom_key3": "custom_value3"}
# # Feed df_source=user event.
# user_id="user_id"
# return self.feed_user_event(
# user_id,
# tags, date_range, status, event_id, title, message, **kwargs
# )
# # Feed df_source=monitor event.
# dimension_tags='{"host":"web01"}' # dimension_tags must be the String(JSON format).
# return self.feed_monitor_event(
# dimension_tags,
# tags, date_range, status, event_id, title, message, **kwargs
# )
# # Feed df_source=system event.
# return self.feed_system_event(
# tags, date_range, status, event_id, title, message, **kwargs
# )
# # metrics, logging, object example.
# def run(self):
# print("Demo")
# measurement = "mydata"
# tags = {"tag1": "val1", "tag2": "val2"}
# fields = {"custom_field1": "val1","custom_field2": 1000}
# kwargs = {"custom_key1":"custom_value1", "custom_key2": "custom_value2", "custom_key3": "custom_value3"}
# # Feed metrics example.
# return self.feed_metric(
# measurement=measurement,
# tags=tags,
# fields=fields,
# **kwargs
# )
# # Feed logging example.
# message = "This is the message for testing"
# return self.feed_logging(
# source=measurement,
# tags=tags,
# message=message,
# **kwargs
# )
# # Feed object example.
# name = "name"
# return self.feed_object(
# cls=measurement,
# name=name,
# tags=tags,
# fields=fields,
# **kwargs
# )
Python SDK API definition (for more details, see datakit_framework.py
):
- Reporting metrics data:
feed_metric(self, input=None, measurement=None, tags=None, fields=None, time=None, **kwargs)
; - Reporting logging data:
feed_logging(self, input=None, source=None, tags=None, message=None, time=None, **kwargs)
; - Reporting object data:
feed_object(self, input=None, cls=None, name=None, tags=None, fields=None, time=None, **kwargs)
; (cls
stands forclass
. Sinceclass
is a Python keyword, it's abbreviated ascls
)
Reporting Pythond Events¶
You can use the following three built-in functions to report events:
- Reporting events where
df_source = user
:feed_user_event(self, df_user_id=None, tags=None, df_date_range=10, df_status=None, df_event_id=None, df_title=None, df_message=None, **kwargs)
- Reporting events where
df_source = monitor
:feed_monitor_event(self, df_dimension_tags=None, tags=None, df_date_range=10, df_status=None, df_event_id=None, df_title=None, df_message=None, **kwargs)
- Reporting events where
df_source = system
:feed_system_event(self, tags=None, df_date_range=10, df_status=None, df_event_id=None, df_title=None, df_message=None, **kwargs)
Common event field descriptions:
Field Name | Type | Required | Description |
---|---|---|---|
df_date_range | Integer | Yes | Time range. Unit s |
df_source | String | Yes | Data source. Possible values: system , monitor , user |
df_status | Enum | Yes | Status. Possible values: ok , info , warning , error , critical , nodata |
df_event_id | String | Yes | Event ID |
df_title | String | Yes | Title |
df_message | String | No | Detailed description |
{other fields} | kwargs , e.g., k1=5, k2=6 |
No | Other additional fields |
- When
df_source = monitor
:
It indicates an event generated by Guance monitoring features, with the following additional fields:
Additional Field Name | Type | Required | Description |
---|---|---|---|
df_dimension_tags | String(JSON format) | Yes | Monitoring dimension tags, e.g., {"host":"web01"} |
- When
df_source = user
:
It indicates an event directly created by users, with the following additional fields:
Additional Field Name | Type | Required | Description |
---|---|---|---|
df_user_id | String | Yes | User ID |
- When
df_source = system
:
It indicates an event generated by the system, with no additional fields.
Example usage:
#encoding: utf-8
from datakit_framework import DataKitFramework
class Demo(DataKitFramework):
name = 'Demo'
interval = 10 # triggered interval seconds.
# if your datakit ip is 127.0.0.1 and port is 9529, you won't need use this,
# just comment it.
# def __init__(self, **kwargs):
# super().__init__(ip = '127.0.0.1', port = 9529)
# KeyEvent report example.
def run(self):
print("Demo")
tags = {"tag1": "val1", "tag2": "val2"}
date_range = 10
status = 'info'
event_id = 'event_id'
title = 'title'
message = 'message'
kwargs = {"custom_key1":"custom_value1", "custom_key2": "custom_value2", "custom_key3": "custom_value3"}
# Feed df_source=user event.
user_id="user_id"
return self.feed_user_event(
df_user_id=user_id,
tags=tags, df_date_range=date_range, df_status=status, df_event_id=event_id, df_title=title, df_message=message, **kwargs
)
# Feed df_source=monitor event.
dimension_tags='{"host":"web01"}' # dimension_tags must be the String(JSON format).
return self.feed_monitor_event(
df_dimension_tags=dimension_tags,
tags=tags, df_date_range=date_range, df_status=status, df_event_id=event_id, df_title=title, df_message=message, **kwargs
)
# Feed df_source=system event.
return self.feed_system_event(
tags=tags, df_date_range=date_range, df_status=status, df_event_id=event_id, df_title=title, df_message=message, **kwargs
)
Git Support¶
Git repo support is available. Once enabled, paths specified in conf args are relative to gitrepos
. For instance, in this case, args should be set to mytest
:
├── datakit
└── gitrepos
└── myconf
├── conf.d
│ └── pythond.conf
└── python.d
└── mytest
└── mytest.py
Complete Example¶
Step 1: Write a class that inherits from DataKitFramework
:
from datakit_framework import DataKitFramework
class MyTest(DataKitFramework):
name = 'MyTest'
interval = 10 # triggered interval seconds.
# if your datakit ip is 127.0.0.1 and port is 9529, you won't need use this,
# just comment it.
# def __init__(self, **kwargs):
# super().__init__(ip = '127.0.0.1', port = 9529)
def run(self):
print("MyTest")
data = [
{
"measurement": "abc",
"tags": {
"t1": "b",
"t2": "d"
},
"fields": {
"f1": 123,
"f2": 3.4,
"f3": "strval"
},
# "time": 1624550216 # you don't need this
},
{
"measurement": "def",
"tags": {
"t1": "b",
"t2": "d"
},
"fields": {
"f1": 123,
"f2": 3.4,
"f3": "strval"
},
# "time": 1624550216 # you don't need this
}
]
in_data = {
'M':data,
'input': "datakitpy"
}
return self.report(in_data) # you must call self.report here
Step 2: We will not enable the git repo feature here. Place test.py
in the mytest
folder under python.d
:
Step 3: Configure pythond.conf:
[[inputs.pythond]]
# Python collector name
name = 'some-python-inputs' # required
# Environment variables needed to run the Python collector
#envs = ['LD_LIBRARY_PATH=/path/to/lib:$LD_LIBRARY_PATH',]
# Path to the executable for the Python collector (absolute path is recommended)
cmd = "python3" # required. python3 is recommended.
# Relative path to user scripts (enter the folder name; all modules and .py files in the immediate subdirectory will be applied)
dirs = ["mytest"]
Step 4: Restart DataKit:
FAQ¶
How to Troubleshoot Errors¶
If the results do not meet expectations, check the following log files:
~/_datakit_pythond_cli.log
_datakit_pythond_framework_[pythond name]_.log