Creating parsers
note
The guide assume you're writting the parser from the test environment
Base parser file#
A simple parser can be defined as :
filter: 1 == 1debug: trueonsuccess: next_stagename: me/myparserdescription: a cool parser for my servicegrok:#our grok pattern : capture .* pattern: ^%{DATA:some_data}$#the field to which we apply the grok pattern : the log message itself apply_on: messagestatics: - parsed: is_my_service value: yes- a filter : if the expression is
true, the event will enter the parser, otherwise, it won't - a onsuccess : defines what happens when the event was successfully parsed : shall we continue ? shall we move to next stage ? etc.
- a
name& adescription - some statics that will modify the event
- a
debugflag that allows to enable local debugging information - a
grokpattern to capture some data in logs
We are going to use to following sample log as an example (x.out) :
May 11 16:23:43 sd-126005 kernel: [47615895.771900] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=99.99.99.99 DST=127.0.0.1 LEN=40 TOS=0x00 PREC=0x00 TTL=245 ID=51006 PROTO=TCP SPT=45225 DPT=8888 WINDOW=1024 RES=0x00 SYN URGP=0 May 11 16:23:50 sd-126005 kernel: [47615902.763137] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=44.44.44.44 DST=127.0.0.1 LEN=60 TOS=0x00 PREC=0x00 TTL=49 ID=17451 DF PROTO=TCP SPT=53668 DPT=80 WINDOW=14600 RES=0x00 SYN URGP=0 Trying our mock parser#
caution
Your parser yaml file must be in the config/parsers/s01-parse/ directory. The stage directory might not exist, and then you must create it.
Let's try it:
./crowdsec -c ./dev.yaml -dsn file://x.log -type foobarExpected output
INFO[20-08-2021 17:18:20] Crowdsec v1.1.1-linux-73e0bbaf93070f4a640eb5a22212b5dcf26699de INFO[20-08-2021 17:18:21] reading x.log at once type="file://x.log"DEBU[20-08-2021 17:18:21] + Grok '^%{DA...' returned 1 entries to merge in Parsed id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] .Parsed['some_data'] = 'May 11 16:23:43 sd-126005 kernel: [47615895.771900] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=99.99.99.99 DST=127.0.0.1 LEN=40 TOS=0x00 PREC=0x00 TTL=245 ID=51006 PROTO=TCP SPT=45225 DPT=8888 WINDOW=1024 RES=0x00 SYN URGP=0 ' id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] + Processing 1 statics id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] .Parsed[is_my_service] = 'yes' id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] Event leaving node : ok id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] move Event from stage s01-parse to s02-enrich id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] + Grok '^%{DA...' returned 1 entries to merge in Parsed id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] .Parsed['some_data'] = 'May 11 16:23:50 sd-126005 kernel: [47615902.763137] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=44.44.44.44 DST=127.0.0.1 LEN=60 TOS=0x00 PREC=0x00 TTL=49 ID=17451 DF PROTO=TCP SPT=53668 DPT=80 WINDOW=14600 RES=0x00 SYN URGP=0' id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] + Processing 1 statics id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] .Parsed[is_my_service] = 'yes' id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] Event leaving node : ok id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] move Event from stage s01-parse to s02-enrich id=billowing-flower name=me/myparser stage=s01-parse...We can see our "mock" parser is working, let's see what happened :
- The event enter the node
- The
filterreturned true (1 == 1) so the event will be processed - Our grok pattern (just a
.*capture) "worked" and captured data (the whole line actually) - The grok captures (under the name
some_data) are merged into the.Parsedmap of the event - The
staticssection is processed, and.Parsed[is_my_service]is set toyes - The event leaves the parser successfully, and because
onsuccessis set tonext_stage, the event moves to the next stage
Writing the GROK pattern#
We are going to write a parser for iptables logs, they look like this :
May 11 16:23:43 sd-126005 kernel: [47615895.771900] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=99.99.99.99 DST=127.0.0.1 LEN=40 TOS=0x00 PREC=0x00 TTL=245 ID=51006 PROTO=TCP SPT=45225 DPT=8888 WINDOW=1024 RES=0x00 SYN URGP=0 May 11 16:23:50 sd-126005 kernel: [47615902.763137] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=44.44.44.44 DST=127.0.0.1 LEN=60 TOS=0x00 PREC=0x00 TTL=49 ID=17451 DF PROTO=TCP SPT=53668 DPT=80 WINDOW=14600 RES=0x00 SYN URGP=0
Using an online grok debugger or an online regex debugger, we come up with the following grok pattern :
\[%{DATA}\]+.*(%{WORD:action})? IN=%{WORD:int_eth} OUT= MAC=%{IP}:%{MAC} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{INT:length}.*PROTO=%{WORD:proto} SPT=%{INT:src_port} DPT=%{INT:dst_port}.*warning
Check if the pattern you are looking for is not already present in patterns configuration.
Test our new pattern#
Now, let's integrate our GROK pattern within our YAML :
#let's set onsuccess to "next_stage" : if the log is parsed, we can consider it has been dealt withonsuccess: next_stage#debug, for reasons (don't do this in production)debug: true#as seen in our sample log, those logs are processed by the system and have a progname set to 'kernel'filter: "1 == 1"#name and description:name: crowdsecurity/iptables-logsdescription: "Parse iptables drop logs"grok:#our grok pattern pattern: \[%{DATA}\]+.*(%{WORD:action})? IN=%{WORD:int_eth} OUT= MAC=%{IP}:%{MAC} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{INT:length}.*PROTO=%{WORD:proto} SPT=%{INT:src_port} DPT=%{INT:dst_port}.*#the field to which we apply the grok pattern : the log message itself apply_on: messagestatics: - parsed: is_my_service value: yes./crowdsec -c ./dev.yaml -dsn file://x.log -type foobarExpected output
INFO[20-08-2021 17:47:46] reading x.log at once type="file://x.log"DEBU[20-08-2021 17:47:46] + Grok '[%{D...' returned 8 entries to merge in Parsed id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed['proto'] = 'TCP' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed['src_port'] = '45225' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed['dst_port'] = '8888' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed['action'] = '' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed['int_eth'] = 'enp1s0' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed['src_ip'] = '99.99.99.99' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed['dst_ip'] = '127.0.0.1' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed['length'] = '40' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] + Processing 1 statics id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed[is_my_service] = 'yes' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] Event leaving node : ok id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] move Event from stage s01-parse to s02-enrich id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseWARN[20-08-2021 17:47:46] Acquisition is finished, shutting down DEBU[20-08-2021 17:47:46] + Grok '\[%{D...' returned 8 entries to merge in Parsed id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed['length'] = '60' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed['proto'] = 'TCP' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed['src_port'] = '53668' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed['dst_port'] = '80' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed['action'] = '' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed['int_eth'] = 'enp1s0' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed['src_ip'] = '44.44.44.44' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed['dst_ip'] = '127.0.0.1' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] + Processing 1 statics id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed[is_my_service] = 'yes' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] Event leaving node : ok id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] move Event from stage s01-parse to s02-enrich id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse...What changed ? We can now see that the fragment captured by the GROK pattern are merged in the Parsed array !
We now have parsed data, only a few more changes and we will be done :)
Finalizing our parser#
#let's set onsuccess to "next_stage" : if the log is parsed, we can consider it has been dealt withonsuccess: next_stage#debug, for reasons (don't do this in production)debug: true#as seen in our sample log, those logs are processed by the system and have a progname set to 'kernel'filter: "evt.Parsed.program == 'kernel'"#name and description:name: crowdsecurity/iptables-logsdescription: "Parse iptables drop logs"grok:#our grok pattern pattern: \[%{DATA}\]+.*(%{WORD:action})? IN=%{WORD:int_eth} OUT= MAC=%{IP}:%{MAC} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{INT:length}.*PROTO=%{WORD:proto} SPT=%{INT:src_port} DPT=%{INT:dst_port}.*#the field to which we apply the grok pattern : the log message itself apply_on: messagestatics: - meta: log_type value: iptables_drop - meta: service expression: "evt.Parsed.proto == 'TCP' ? 'tcp' : 'unknown'" - meta: source_ip expression: "evt.Parsed.src_ip"filter#
We changed the filter to correctly filter on the program name.
In the current example, our logs are produced by the kernel (netfilter), and thus the program is kernel :
tail -f /var/log/kern.logMay 11 16:23:50 sd-126005 kernel: [47615902.763137] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=44.44.44.44 DST=127.0.0.1 LEN=60 TOS=0x00 PREC=0x00 TTL=49 ID=17451 DF PROTO=TCP SPT=53668 DPT=80 WINDOW=14600 RES=0x00 SYN URGP=0 statics#
We are setting various entries to static or dynamic values to give "context" to the log :
.Meta.log_typeis set toiptables_drop(so that we later can filter events coming from this).Meta.source_ipis set the the source ip captured.Parsed.src_ip.Meta.serviceis set the the result of an expression that relies on the GROK output (protofield)
Look into dedicated statics documentation to know more about its possibilities.
Testing our finalized parser#
./crowdsec -c ./dev.yaml -dsn file://x.log -type kernelExpected output
...INFO[20-08-2021 17:49:02] reading x.log at once type="file://x.log"DEBU[20-08-2021 17:49:02] eval(evt.Parsed.program == 'kernel') = TRUE id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] eval variables: id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] evt.Parsed.program = 'kernel' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] + Grok '[%{D...' returned 8 entries to merge in Parsed id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Parsed['proto'] = 'TCP' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Parsed['src_port'] = '45225' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Parsed['dst_port'] = '8888' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Parsed['action'] = '' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Parsed['int_eth'] = 'enp1s0' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Parsed['src_ip'] = '99.99.99.99' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Parsed['dst_ip'] = '127.0.0.1' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Parsed['length'] = '40' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] + Processing 3 statics id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Meta[log_type] = 'iptables_drop' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Meta[service] = 'tcp' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Meta[source_ip] = '99.99.99.99' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] Event leaving node : ok id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] move Event from stage s01-parse to s02-enrich id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseWARN[20-08-2021 17:49:02] Acquisition is finished, shutting down DEBU[20-08-2021 17:49:02] eval(evt.Parsed.program == 'kernel') = TRUE id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] eval variables: id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] evt.Parsed.program = 'kernel' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] + Grok '\[%{D...' returned 8 entries to merge in Parsed id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Parsed['src_ip'] = '44.44.44.44' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Parsed['dst_ip'] = '127.0.0.1' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Parsed['length'] = '60' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Parsed['proto'] = 'TCP' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Parsed['src_port'] = '53668' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Parsed['dst_port'] = '80' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Parsed['action'] = '' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Parsed['int_eth'] = 'enp1s0' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] + Processing 3 statics id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Meta[log_type] = 'iptables_drop' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Meta[service] = 'tcp' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Meta[source_ip] = '44.44.44.44' id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] Event leaving node : ok id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] move Event from stage s01-parse to s02-enrich id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parse...Closing word#
We have now a fully functional parser for iptables logs ! We can either deploy it to our production systems to do stuff, or even better, contribute to the hub !
If you want to know more about directives and possibilities, take a look at the parser reference documentation !