How to Write a Pipeline Script¶
Writing Pipeline scripts can be complex. To assist with this, Datakit includes a simple debugging tool that helps users write Pipeline scripts.
Debugging Grok and Pipeline¶
Specify the name of the Pipeline script and input a text snippet to determine if extraction is successful.
The Pipeline script must be placed in the [Datakit installation directory]/pipeline directory.
$ datakit pipeline -P your_pipeline.p -T '2021-01-11T17:43:51.887+0800 DEBUG io io/io.go:458 post cost 6.87021ms'
Extracted data(cost: 421.705µs): # Indicates successful parsing
{
"code" : "io/io.go: 458", # Corresponding code location
"level" : "DEBUG", # Corresponding log level
"module" : "io", # Corresponding code module
"msg" : "post cost 6.87021ms", # Raw log content
"time" : 1610358231887000000 # Log time (Unix nanosecond timestamp)
"message": "2021-01-11T17:43:51.887+0800 DEBUG io io/io.g o:458 post cost 6.87021ms"
}
Example of failed extraction (only message
remains, indicating other fields were not extracted):
$ datakit pipeline -P other_pipeline.p -T '2021-01-11T17:43:51.887+0800 DEBUG io io/io.g o:458 post cost 6.87021ms'
{
"message": "2021-01-11T17:43:51.887+0800 DEBUG io io/io.g o:458 post cost 6.87021ms"
}
If the debug text is complex, you can write it into a file (sample.log) and use the following method for debugging:
For more Pipeline debugging commands, see datakit help pipeline
.
Grok Wildcard Search¶
Given the numerous Grok patterns, manual matching can be cumbersome. Datakit provides an interactive command-line tool grokq
(Grok query):
datakit tool --grokq
grokq > Mon Jan 25 19:41:17 CST 2021 # Enter the text you want to match here
2 %{DATESTAMP_OTHER: ?} # The tool suggests corresponding matches; higher-ranked matches are more precise (higher weight). The number before indicates the weight.
0 %{GREEDYDATA: ?}
grokq > 2021-01-25T18:37:22.016+0800
4 %{TIMESTAMP_ISO8601: ?} # The ? indicates you need to name the matched text with a field
0 %{NOTSPACE: ?}
0 %{PROG: ?}
0 %{SYSLOGPROG: ?}
0 %{GREEDYDATA: ?} # Patterns like GREEDYDATA have a wide range and lower weight
# Higher weights mean better precision
grokq > Q # Use Q or exit to quit
Bye!
Warning
On Windows, execute the debugging in PowerShell.
Handling Multi-line Logs¶
When dealing with call stack-related logs, which have varying line counts, directly using the GREEDYDATA
pattern cannot handle logs like the following:
2022-02-10 16:27:36.116 ERROR 1629881 --- [scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler : Unexpected error occurred in scheduled task
java.lang.NullPointerException: null
at com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.isSimilarPrize(xxxxxxxxxxxxxxxxx.java:442)
at com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.lambda$getSimilarPrizeSnapUpDo$0(xxxxxxxxxxxxxxxxx.java:595)
at java.util.stream.ReferencePipeline$3$1.accept(xxxxxxxxxxxxxxxxx.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(xxxxxxxxx.java:1382)
at java.util.stream.AbstractPipeline.copyInto(xxxxxxxxxxxxxxxx.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(xxxxxxxxxxxxxxxx.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(xxxxxxxxx.java:708)
at java.util.stream.AbstractPipeline.evaluate(xxxxxxxxxxxxxxxx.java:234)
at java.util.stream.ReferencePipeline.collect(xxxxxxxxxxxxxxxxx.java:499)
Here, you can use the GREEDYLINES
rule for wildcard matching, as shown in (usr/local/datakit/pipeline/test.p):
add_pattern('_dklog_date', '%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}%{INT}')
grok(_, '%{_dklog_date:log_time}\\s+%{LOGLEVEL:Level}\\s+%{NUMBER:Level_value}\\s+---\\s+\\[%{NOTSPACE:thread_name}\\]\\s+%{GREEDYDATA:Logger_name}\\s+(\\n)?(%{GREEDYLINES:stack_trace})'
# Remove the message field for easier debugging
drop_origin_data()
Save the above multi-line log as multi-line.log and debug it:
The parsed result is as follows:
{
"Level": "ERROR",
"Level_value": "1629881",
"Logger_name": "o.s.s.s.TaskUtils$LoggingErrorHandler : Unexpected error occurred in scheduled task",
"log_time": "2022-02-10 16:27:36.116",
"stack_trace": "java.lang.NullPointerException: null\n\tat com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.isSimilarPrize(xxxxxxxxxxxxxxxxx.java:442)\n\tat com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.lambda$getSimilarPrizeSnapUpDo$0(xxxxxxxxxxxxxxxxx.java:595)\n\tat java.util.stream.ReferencePipeline$3$1.accept(xxxxxxxxxxxxxxxxx.java:193)\n\tat java.util.ArrayList$ArrayListSpliterator.forEachRemaining(xxxxxxxxx.java:1382)\n\tat java.util.stream.AbstractPipeline.copyInto(xxxxxxxxxxxxxxxx.java:481)\n\tat java.util.stream.AbstractPipeline.wrapAndCopyInto(xxxxxxxxxxxxxxxx.java:471)\n\tat java.util.stream.ReduceOps$ReduceOp.evaluateSequential(xxxxxxxxx.java:708)\n\tat java.util.stream.AbstractPipeline.evaluate(xxxxxxxxxxxxxxxx.java:234)\n\tat java.util.stream.ReferencePipeline.collect(xxxxxxxxxxxxxxxxx.java:499)",
"thread_name": "scheduling-1"
}
Pipeline Field Naming Considerations¶
All fields extracted by the Pipeline are metrics (fields), not tags. Due to line protocol constraints, we should not extract any fields with the same names as tags. These tags include the following categories:
Additionally, all collected logs contain multiple reserved fields. We should not overwrite these fields, as doing so may cause data to display incorrectly on the Explorer page.
Field Name | Type | Description |
---|---|---|
source |
string(tag) | Log source |
service |
string(tag) | Service corresponding to the log, defaults to service |
status |
string(tag) | Log level |
message |
string(field) | Original log |
time |
int | Log timestamp |
Tip
We can override the values of these tags using specific Pipeline functions.
If any fields extracted by the Pipeline have the same name as existing tags (case-sensitive), it will result in the following error. Therefore, it's recommended to avoid naming conflicts in the Pipeline.
Complete Pipeline Example¶
Here’s an example of parsing Datakit's own logs. The format of Datakit logs is as follows:
Corresponding Pipeline:
# Pipeline for Datakit logs
# Mon Jan 11 10:42:41 CST 2021
# auth: tanb
grok(_, '%{_dklog_date:log_time}%{SPACE}%{_dklog_level:level}%{SPACE}%{_dklog_mod:module}%{SPACE}%{_dklog_source_file:code}%{SPACE}%{_dklog_msg:msg}')
rename("time", log_time) # Rename log_time to time
default_time(time) # Set the time field as the output data timestamp
drop_origin_data() # Discard original log text (not recommended)
This references several user-defined patterns, such as _dklog_date
, _dklog_level
. Place these rules in
Warning
User-defined patterns that need to take effect globally (i.e., used in other Pipeline scripts) must be placed in [Datakit installation directory]/pipeline/pattern/:
$ cat pipeline/pattern/datakit
# Note: It's best to prefix custom pattern names to avoid conflicts with built-in names
# Built-in pattern names cannot be overridden
#
# Custom pattern format:
# <pattern-name><space><specific pattern combination>
#
_dklog_date %{YEAR}-%{MONTHNUM}-%{MONTHDAY}T%{HOUR}:%{MINUTE}:%{SECOND}%{INT}
_dklog_level (DEBUG|INFO|WARN|ERROR|FATAL)
_dklog_mod %{WORD}
_dklog_source_file (/?[\w_%!$@:.,-]?/?)(\S+)?
_dklog_msg %{GREEDYDATA}
Now that both the Pipeline and its referenced patterns are set up, you can use Datakit's built-in Pipeline debugging tool to parse this log line:
# Successful extraction example
datakit pipeline -P dklog_pl.p -T '2021-01-11T17:43:51.887+0800 DEBUG io io/io.go:458 post cost 6.87021ms'
Extracted data(cost: 421.705µs):
{
"code": "io/io.go:458",
"level": "DEBUG",
"module": "io",
"msg": "post cost 6.87021ms",
"time": 1610358231887000000
}
FAQ¶
Why Can't Variables Be Referenced During Pipeline Debugging?¶
Consider the following Pipeline:
json(_, message, "message")
json(_, thread_name, "thread")
json(_, level, "status")
json(_, @timestamp, "time")
It results in the following error:
This is because the variable name (@timestamp
) contains special characters. In such cases, you need to escape this variable:
Refer to Basic Syntax Rules for Pipeline
Why Can't the Corresponding Pipeline Script Be Found During Pipeline Debugging?¶
Command:
$ datakit pipeline -P test.p -T "..."
[E] get pipeline failed: stat /usr/local/datakit/pipeline/test.p: no such file or directory
This is because the Pipeline being debugged is located in the wrong place. Debugging Pipeline scripts must be placed in the [Datakit installation directory]/pipeline/ directory.
How to Parse Multiple Different Formats of Logs in One Pipeline?¶
In daily logs, due to different business requirements, logs can take various forms. To improve Grok performance, prioritize matching Groks based on their frequency. This way, most logs will likely match within the first few Groks, avoiding unnecessary matches.
Tip
Grok matching is the most resource-intensive part of log parsing. Avoid redundant Grok matches to significantly enhance Grok performance.
grok(_, "%{NOTSPACE:client_ip} %{NOTSPACE:http_ident} ...")
if client_ip != nil {
# Confirms the Grok has matched; proceed with subsequent processing based on this log
...
} else {
# Indicates a different log type; try another Grok
grok(_, "%{date2:time} \\[%{LOGLEVEL:status}\\] %{GREEDYDATA:msg} ...")
if status != nil {
# Check if the Grok matched
} else {
# Unrecognized log or add another Grok for further processing
}
}
How to Drop Fields During Parsing?¶
In some cases, we only need a few fields from the middle of a log but cannot skip the preceding parts, e.g.,
Here, we only need 44
, which might represent response latency. You can parse it as follows (without attaching :some_field
in Grok):
Escaping Issues with add_pattern()
¶
When using add_pattern()
to add local patterns, escaping issues can arise, e.g., for a pattern matching file paths and filenames:
If placed in the global pattern directory (pipeline/pattern), it can be written as:
If using add_pattern()
, it needs to be written as:
That is, backslashes need to be escaped.