How to Write Pipeline Scripts¶
Pipeline writing is relatively troublesome, so Datakit has built-in simple debugging tools to help you write Pipeline scripts.
Debug grok and pipeline¶
Specify the pipeline script name and enter a piece of text to determine whether the extraction is successful or not.
The pipeline script must be placed in the [Datakit 安装目录]/pipeline directory.
$ datakit pipeline -P your_pipeline.p -T '2021-01-11T17:43:51.887+0800 DEBUG io io/io.go:458 post cost 6.87021ms'
Extracted data(cost: 421.705µs): # Indicate successful cutting
{
"code" : "io/io.go: 458", # Corresponding code position
"level" : "DEBUG", # Corresponding log level
"module" : "io", # Corresponding code module
"msg" : "post cost 6.87021ms", # Pure log attributes
"time" : 1610358231887000000 # Log time (Unix nanosecond timestamp)
"message": "2021-01-11T17:43:51.887+0800 DEBUG io io/io.g o:458 post cost 6.87021ms"
}
Extraction failure example (only message
is left, indicating that other fields have not been extracted):
$ datakit pipeline -P other_pipeline.p -T '2021-01-11T17:43:51.887+0800 DEBUG io io/io.g o:458 post cost 6.87021ms'
{
"message": "2021-01-11T17:43:51.887+0800 DEBUG io io/io.g o:458 post cost 6.87021ms"
}
If the debug text is complex, you can write it to a file (sample.log) and debug it as follows:
For more Pipeline debugging commands, see datakit help pipeline
.
Grok Wildcard Search¶
Manual matching is troublesome due to the large number of Grok patterns. Datakit provides an interactive command-line tool, grokq
(grok query):
datakit tool --grokq
grokq > Mon Jan 25 19:41:17 CST 2021 # Enter the text you want to match here
2 %{DATESTAMP_OTHER: ?} # The tool will give corresponding suggestions, and the more accurate the matching month is (the greater the weight is). The previous figures indicate the weights.
0 %{GREEDYDATA: ?}
grokq > 2021-01-25T18:37:22.016+0800
4 %{TIMESTAMP_ISO8601: ?} # Here ? indicates that you need to name the matching text with a field
0 %{NOTSPACE: ?}
0 %{PROG: ?}
0 %{SYSLOGPROG: ?}
0 %{GREEDYDATA: ?} # A wide range of patterns like GREEDYDATA have low weights
# The higher the weight, the greater the matching accuracy
grokq > Q # Q or exit
Bye!
Warning
In Windows environment, debug in Powershell.
How to Handle with Multiple Lines¶
When dealing with some call stack related logs, the logs of the following situations cannot be handled directly with the pattern GREEDYDATA
since the number of log lines is not fixed:
2022-02-10 16:27:36.116 ERROR 1629881 --- [scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler : Unexpected error occurred in scheduled task
java.lang.NullPointerException: null
at com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.isSimilarPrize(xxxxxxxxxxxxxxxxx.java:442)
at com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.lambda$getSimilarPrizeSnapUpDo$0(xxxxxxxxxxxxxxxxx.java:595)
at java.util.stream.ReferencePipeline$3$1.accept(xxxxxxxxxxxxxxxxx.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(xxxxxxxxx.java:1382)
at java.util.stream.AbstractPipeline.copyInto(xxxxxxxxxxxxxxxx.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(xxxxxxxxxxxxxxxx.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(xxxxxxxxx.java:708)
at java.util.stream.AbstractPipeline.evaluate(xxxxxxxxxxxxxxxx.java:234)
at java.util.stream.ReferencePipeline.collect(xxxxxxxxxxxxxxxxx.java:499)
Here you can use the GREEDYLINES
rule for generalization, such as (/usr/local/datakit/pipeline/test.p):
add_pattern('_dklog_date', '%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}%{INT}')
grok(_, '%{_dklog_date:log_time}\\s+%{LOGLEVEL:Level}\\s+%{NUMBER:Level_value}\\s+---\\s+\\[%{NOTSPACE:thread_name}\\]\\s+%{GREEDYDATA:Logger_name}\\s+(\\n)?(%{GREEDYLINES:stack_trace})'
# Remove the message field here for easy debugging
drop_origin_data()
Save the above multi-line log as multi-line.log and debug it:
The following cutting results are obtained:
{
"Level": "ERROR",
"Level_value": "1629881",
"Logger_name": "o.s.s.s.TaskUtils$LoggingErrorHandler : Unexpected error occurred in scheduled task",
"log_time": "2022-02-10 16:27:36.116",
"stack_trace": "java.lang.NullPointerException: null\n\tat com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.isSimilarPrize(xxxxxxxxxxxxxxxxx.java:442)\n\tat com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.lambda$getSimilarPrizeSnapUpDo$0(xxxxxxxxxxxxxxxxx.java:595)\n\tat java.util.stream.ReferencePipeline$3$1.accept(xxxxxxxxxxxxxxxxx.java:193)\n\tat java.util.ArrayList$ArrayListSpliterator.forEachRemaining(xxxxxxxxx.java:1382)\n\tat java.util.stream.AbstractPipeline.copyInto(xxxxxxxxxxxxxxxx.java:481)\n\tat java.util.stream.AbstractPipeline.wrapAndCopyInto(xxxxxxxxxxxxxxxx.java:471)\n\tat java.util.stream.ReduceOps$ReduceOp.evaluateSequential(xxxxxxxxx.java:708)\n\tat java.util.stream.AbstractPipeline.evaluate(xxxxxxxxxxxxxxxx.java:234)\n\tat java.util.stream.ReferencePipeline.collect(xxxxxxxxxxxxxxxxx.java:499)",
"thread_name": "scheduling-1"
}
Pipeline Field Naming Notes¶
In all the fields cut out by Pipeline, they are a field rather than a tag. We should not cut out any fields with the same name as tag due to the line protocol constraint. These tags include the following categories:
- Global Tag in Datakit
- Custom Tag in Log Collector
In addition, all collected logs have the following reserved fields. We should not override these fields, otherwise the data may not appear properly on the observer page.
Field Name | Type | Description |
---|---|---|
source |
string(tag) | Log source |
service |
string(tag) | The service corresponding to the log is the same as the service by default |
status |
string(tag) | The level corresponding to the log |
message |
string(field) | Original log |
time |
int | Timestamp corresponding to log |
Abstract
Of course, we can override the values of these tags by specific Pipeline function.
Once the Pipeline cut-out field has the same name as the existing Tag (case sensitive), it will cause the following data error. Therefore, it is recommended to bypass these field naming in Pipeline cutting.
Complete Pipeline Sample¶
Take Datakit's own log cutting as an example. Datakit's own log form is as follows:
Write the corresponding pipeline:
# pipeline for datakit log
# Mon Jan 11 10:42:41 CST 2021
# auth: tanb
grok(_, '%{_dklog_date:log_time}%{SPACE}%{_dklog_level:level}%{SPACE}%{_dklog_mod:module}%{SPACE}%{_dklog_source_file:code}%{SPACE}%{_dklog_msg:msg}')
rename("time", log_time) # rename log_time to time
default_time(time) # use the time field as the timestamp of the output data
drop_origin_data() # discard the original log text (not recommended)
Several user-defined patterns are referenced, such as _dklog_date
、_dklog_level
. We put these rules under <datakit安装目录>/pipeline/pattern
.
Note that the user-defined pattern must be placed in the [Datakit 安装目录]/pipeline/pattern/ directory) if it needs to be globally effective (that is, applied in other pipeline scripts):
$ cat pipeline/pattern/datakit
# Note: For these custom patterns, it is best to add a specific prefix to the name so as not to conflict with the built-in naming (the built-in pattern name is not allowed to be overwritten)
# Custom pattern format is:
# <pattern-name><空格><具体 pattern 组合>
_dklog_date %{YEAR}-%{MONTHNUM}-%{MONTHDAY}T%{HOUR}:%{MINUTE}:%{SECOND}%{INT}
_dklog_level (DEBUG|INFO|WARN|ERROR|FATAL)
_dklog_mod %{WORD}
_dklog_source_file (/?[\w_%!$@:.,-]?/?)(\S+)?
_dklog_msg %{GREEDYDATA}
Now that you have the pipeline and its referenced pattern, you can cut this line of logs through Datakit's built-in pipeline debugging tool:
# Extract successful examples
$ ./datakit pipeline -P dklog_pl.p -T '2021-01-11T17:43:51.887+0800 DEBUG io io/io.go:458 post cost 6.87021ms'
Extracted data(cost: 421.705µs):
{
"code": "io/io.go:458",
"level": "DEBUG",
"module": "io",
"msg": "post cost 6.87021ms",
"time": 1610358231887000000
}
FAQ¶
Why can't variables be referenced when Pipeline is debugging?¶
Pipeline:
json(_, message, "message")
json(_, thread_name, "thread")
json(_, level, "status")
json(_, @timestamp, "time")
The error reported is as follows:
A: For variables with special characters, you need to decorate them with two `
:
See Basic syntax rules of Pipeline
When debugging Pipeline, why can't you find the corresponding Pipeline script?¶
The order is as follows:
$ datakit pipeline -P test.p -T "..."
[E] get pipeline failed: stat /usr/local/datakit/pipeline/test.p: no such file or directory
A: Pipeline scripts for debugging. Place them in
How to cut logs in many different formats in one Pipeline?¶
In daily logs, because of different services, logs will take on various forms. At this time, multiple Grok cuts need to be written. In order to improve the running efficiency of Grok, you can give priority to matching the Grok with higher frequency according to the frequency of logs, so that high probability logs can be matched in the previous Groks, avoiding invalid matching.
Warning
In log cutting, Grok matching is the most expensive part, so avoiding repeated Grok matching can greatly improve the cutting performance of Grok.
grok(_, "%{NOTSPACE:client_ip} %{NOTSPACE:http_ident} ...")
if client_ip != nil {
# Prove that the above grok has matched at this time, then continue the subsequent processing according to the log
...
} else {
# Here shows that there is a different log, and the above grok does not match the current log
grok(_, "%{date2:time} \\[%{LOGLEVEL:status}\\] %{GREEDYDATA:msg} ...")
if status != nil {
# Here you can check whether the grok above matches...
} else {
# Unrecognized logs, or a grok can be added here to process them, so as to step by step
}
}
How to discard field cut?¶
In some cases, all we need is a few fields in the middle of log, but it is difficult to skip the previous parts, such as:
Where we only need the value of 44
, which may be code response delay, we can cut it like this (that is, the :some_field
part is not included in Grok):
add_pattern()
Escape Problem¶
When you use add_pattern()
to add local patterns, you are prone to escape problems, such as the following pattern (used to match file paths and file names):
If we put it in the global pattern directory (that is, pipeline/pattern directory), we can write this:
If you use add_pattern()
, you need to write this:
That is, the backslash needs to be escaped.