Event Data Sharding Practice: Implementation Based on Dataway Sink¶
This document provides a detailed explanation of how to achieve intelligent sharding of event data (keyevent
) through injecting HTTP Headers with DataFlux Func and Dataway Sinker rule configuration. With this solution, you can route event data with different business attributes and environmental characteristics to specified workspaces.
Solution Principle¶
Data Sharding Process¶
Core Mechanism Description¶
-
Injecting Identifiers on the DataFlux Func Side: During event data reporting, dynamically generate the
X-Global-Tags
Header via Func configuration, which includes key-value pairs required for sharding (e.g.,env=prod
). -
Dataway Routing Match: Dataway forwards events carrying specific identifiers to corresponding workspaces based on the rules defined in
sinker.json
.
1. Dataway Configuration¶
Before using this feature, ensure that Dataway is deployed and the Sinker sharding function is enabled.
For Sinker configuration, refer to: Dataway Sinker Configuration Guide;
Note: The Dataway used by the built-in DataFlux Func in the deployment version is located under the utils
namespace as internal-dataway
.
2. DataFlux Func Configuration¶
Injecting X-Global-Tags into Headers¶
Core Parameter Description¶
Parameter Name | Type | Description |
---|---|---|
CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS |
list/string | Defines the generation rules for event data sharding identifiers |
Simple Example¶
Route all workspace events to the "Event Centralized Management" workspace:
-
Access the Launcher console;
-
Navigate to the top-right corner > Modify Application Configuration;
-
Locate the
func2Config
configuration item under thefunc2
namespace; -
Add configuration:
-
Configure the Dataway Sinker rule: Modify the
sinker.json
configuration file and set the data routing rules:
{
"strict": true,
"rules": [
{
"rules": [
"{ df_source = 'monitor' }"
],
"url": "workspace data reporting address"
}
]
}
Special Field Description¶
Field Name | Description |
---|---|
DF_WORKSPACE_UUID |
Workspace ID |
DF_WORKSPACE_NAME |
Workspace name |
DF_MONITOR_CHECKER_ID |
Monitor ID |
DF_MONITOR_CHECKER_NAME |
Monitor name |
More Advanced Configurations¶
Configuration Method | Example | Description |
---|---|---|
Direct Extraction | -host |
Extract the host field from the event data's tags or fields |
Rename Fields | -src:service; dest:business_type |
Rename the service field to business_type |
Value Mapping | remap:{order:e-commerce} |
Map the original value order to e-commerce |
Default Value | default:unknown |
Use the default value if the field does not exist |
Fixed Value | - dest:env; fixed:prod |
Directly inject the fixed value env=prod |
Global Tags Generation Rules¶
Field Name | Type | Default Value | Description |
---|---|---|---|
[#].category |
string/[string] | "*" |
Matches the Category of the data |
[#].fields |
string/dict [string]/[dict] | - | Extracts data fields (including Tags and Fields); supports direct extraction and rule-based extraction |
[#].fields[#] |
string | - | Extracts the field name, supporting additional extraction fields (see table below) |
[#].fields[#] |
dict | - | Extraction field rules |
[#].fields[#].src |
string | - | Extracts the field name, supporting additional extraction fields (see table below) |
[#].fields[#].dest |
string | Same as src |
Field name written into the Header after extraction |
[#].fields[#].default |
string | - | Default value written into the Header when the specified field does not exist |
[#].fields[#].fixed |
string | - | Fixed value written into the Header |
[#].fields[#].remap |
dict | null |
Maps extracted field values for transformation |
[#].fields[#].remap_default |
string | - | Default value when there is no corresponding mapping value during the remapping of extracted field values If not specified, the original value is retained If specified as null , this field is ignored |
[#].filter |
dict/string | null |
Filters matching data Supports Tag filtering and filterString filtering |
Custom Global Tags Generation Function ID¶
The function ID format is {script set ID}__{script ID}.{function name}
Function definition as follows:
Parameter | Type | Description |
---|---|---|
category |
string | Category, such as "keyevent" |
point |
dict | Single data point to be processed |
point.measurement |
string | Data measurement |
point.tags |
dict | Data tags content |
point.fields |
dict | Data fields content |
extra_fields |
dict | Additional extracted fields (see table below) |
Example:
point
parameter value
{
"measurement": "keyevent",
"tags": {
"host": "web-001",
"ip" : "1.2.3.4"
},
"fields": {
"name": "Tom"
}
}
extra_fields
parameter value
{
"DF_WORKSPACE_UUID" : "wksp_xxxxx",
"DF_MONITOR_CHECKER_ID" : "rul_xxxxx",
"DF_MONITOR_CHECKER_NAME": "Monitor XXXXX",
"DF_WORKSPACE_NAME" : "Workspace XXXXX"
}
Verification of Generated Effects¶
Example of adding key:value
in the Header {#example}
Writing Event Data to the Same Workspace¶
- Extract fields from events
Example Configuration
CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
- category: keyevent
fields:
- host
- name
- DF_WORKSPACE_UUID
Example Data
{
"measurement": "keyevent",
"tags": {
"host": "web-001",
"ip" : "1.2.3.4"
},
"fields": {
"name": "Tom"
}
}
Example Written Header
- Extract single field from events
Example Configuration
CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
- category: keyevent
# Simplified syntax when only one field exists
fields: host
Example Data
Example Written Header
Writing All Data to the Same Workspace¶
- Omitting
category
indicates processing all data
Example Configuration
Example Data
Example Written Header
Other Scenarios¶
- Changing field names during extraction
Example Configuration
Example Data
Example Written Header
- Mapping field values during extraction
Example Configuration
CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
- fields:
- src : result
remap:
OK : ok
success: ok
failed : error
failure: error
timeout: error
remap_default: unknown
Example Data
Example Written Header
- Using default values during extraction
Example Configuration
Example Data
Example Written Header
- Writing fixed values
Example Configuration
Example Data
Example Written Header
- Matching data using Tag methods
Example Configuration
CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
- fields: host
filter:
service: app-*
- fields: client_ip
filter:
service: web-*
Example Data
{
"measurement": "keyevent",
"tags": {
"host" : "app-001",
"client_ip": "1.2.3.4",
"service" : "app-user"
},
"fields": {
"name": "Tom"
}
}
Example Written Header
- Matching data using filterString methods
Example Configuration
CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
- fields: host
filter: 'service:app-*'
- fields: client_ip
filter: 'service:web-*'
Example Data
{
"measurement": "keyevent",
"tags": {
"host" : "app-001",
"client_ip": "1.2.3.4",
"service" : "app-user"
},
"fields": {
"name": "Tom"
}
}
Example Written Header
- Extracting prefixes or suffixes of event fields using custom functions
Example Configuration
Example Function Located in Script Set my_script_set
, Script my_script
def make_global_tags(category, point, extra_fields):
# Only process event type data
if category != 'keyevent':
return
global_tags_list = {}
# Get the `name` and `region` fields from the data's `fields` or `tags`
name = point['fields'].get('name') or point['tags'].get('name')
region = point['fields'].get('region') or point['tags'].get('region')
# Get the prefix of `name`
if name:
prefix = str(name).split('-')[0]
global_tags_list['name_prefix'] = prefix
# Get the suffix of `region`
if region:
suffix = str(region).split('-').pop()
global_tags_list['region_suffix'] = suffix
# Return
return global_tags_list
Example Data
{
"measurement": "keyevent",
"tags": {
"region" : "cn-shanghai",
"service" : "app-user"
},
"fields": {
"name": "Tom-Jerry"
}
}
Example Written Header
Example Reported Event:
{
"measurement": "keyevent",
"tags": { "host": "web-01", "service": "order" },
"fields": { "message": "User order exception" }
}
Generated HTTP Header:
3. Dataway Sinker Rule Configuration¶
Rule File Example (sinker.json
)¶
{
"strict": false,
"rules": [
{
"rules": ["{ business_type = 'e-commerce' }"], // Match e-commerce events
"url": "https://kodo.guance.com?token=tkn_e-commerce_space_token"
},
{
"rules": ["{ DF_WORKSPACE_UUID = 'wksp_123' }"], // Match specified workspace
"url": "https://backup.guance.com?token=tkn_backup_space_token"
},
{
"rules": ["*"], // Default rule (must exist)
"url": "https://default.guance.com?token=tkn_default_space_token"
}
]
}
Rule Syntax Description¶
Operator | Example | Description |
---|---|---|
= |
{ env = 'prod' } |
Exact match |
!= |
{ env != 'test' } |
Not equal to |
in |
{ region in ['cn-east','cn-north'] } |
Multi-value match |
match |
{ host match 'web-*' } |
Wildcard match |
4. Datakit End Configuration Description¶
Basic Configuration¶
# /usr/local/datakit/conf.d/datakit.conf
[dataway]
# Enable Sinker function
enable_sinker = true
# Define sharding basis fields (maximum 3)
global_customer_keys = ["host", "env"]
Notes¶
-
Field Type Restrictions: Only string-type fields are supported (all Tag values are strings)
-
Binary Data Support: Supports Session Replay, Profiling, etc., for binary data sharding
-
Performance Impact: Each added sharding field increases memory usage by approximately 5%
5. Impact of Global Tags¶
1. Global Tag Example¶
# datakit.conf
[election.tags]
cluster = "cluster-A" # Global election tag
[global_tags]
region = "cn-east" # Global host tag
2. Merging Logic for Sharding Identifiers¶
Assume the event data contains the following Tags:
Final Sharding Identifier:
Extended Explanation: Sharding for Other Data Types¶
1. Custom Sharding Rules¶
For non-event data (such as logging
, metric
),分流 can be achieved by specifying category
:
# Func Configuration Example: Processing logging data
CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
- category: logging
fields:
- src: log_level
remap:
error: Critical Error
warn: General Warning
- service
2. General Principles¶
-
Isolated Configuration: Different data categories (
keyevent
/logging
/metric
) use independent configuration blocks -
Field Simplification: The number of sharding identifiers for a single data category should not exceed 3
-
Avoid Conflicts: It is recommended to use different naming conventions for sharding fields across different categories
Troubleshooting¶
Common Issues¶
Phenomenon | Troubleshooting Steps |
---|---|
Sharding Not Effective | 1. Check Dataway logs grep 'sinker reload' 2. Verify Header using curl -v 3. Check Sinker rule priority |
Partial Data Loss | 1. Confirm strict mode status2. Check if the default rule exists |
Identifier Not Injected | 1. Validate Func configuration syntax 2. Check if the field is of string type |