Skip to main content
Version: v1.0

Creating parsers

note

The guide assume you're writting the parser from the test environment

Base parser file#

A simple parser can be defined as :

filter: 1 == 1debug: trueonsuccess: next_stagename: me/myparserdescription: a cool parser for my servicegrok:#our grok pattern : capture .*  pattern: ^%{DATA:some_data}$#the field to which we apply the grok pattern : the log message itself  apply_on: messagestatics:  - parsed: is_my_service    value: yes
  • a filter : if the expression is true, the event will enter the parser, otherwise, it won't
  • a onsuccess : defines what happens when the event was successfully parsed : shall we continue ? shall we move to next stage ? etc.
  • a name & a description
  • some statics that will modify the event
  • a debug flag that allows to enable local debugging information
  • a grok pattern to capture some data in logs

We are going to use to following sample log as an example (x.out) :

May 11 16:23:43 sd-126005 kernel: [47615895.771900] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=99.99.99.99 DST=127.0.0.1 LEN=40 TOS=0x00 PREC=0x00 TTL=245 ID=51006 PROTO=TCP SPT=45225 DPT=8888 WINDOW=1024 RES=0x00 SYN URGP=0 May 11 16:23:50 sd-126005 kernel: [47615902.763137] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=44.44.44.44 DST=127.0.0.1 LEN=60 TOS=0x00 PREC=0x00 TTL=49 ID=17451 DF PROTO=TCP SPT=53668 DPT=80 WINDOW=14600 RES=0x00 SYN URGP=0 

Trying our mock parser#

caution

Your parser yaml file must be in the config/parsers/s01-parse/ directory. The stage directory might not exist, and then you must create it.

Let's try it:

./crowdsec -c ./dev.yaml -dsn file://x.log -type foobar
Expected output
INFO[20-08-2021 17:18:20] Crowdsec v1.1.1-linux-73e0bbaf93070f4a640eb5a22212b5dcf26699de INFO[20-08-2021 17:18:21] reading x.log at once                         type="file://x.log"DEBU[20-08-2021 17:18:21] + Grok '^%{DA...' returned 1 entries to merge in Parsed  id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21]   .Parsed['some_data'] = 'May 11 16:23:43 sd-126005 kernel: [47615895.771900] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=99.99.99.99 DST=127.0.0.1 LEN=40 TOS=0x00 PREC=0x00 TTL=245 ID=51006 PROTO=TCP SPT=45225 DPT=8888 WINDOW=1024 RES=0x00 SYN URGP=0 '  id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] + Processing 1 statics                        id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] .Parsed[is_my_service] = 'yes'                id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] Event leaving node : ok                       id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] move Event from stage s01-parse to s02-enrich  id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] + Grok '^%{DA...' returned 1 entries to merge in Parsed  id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21]   .Parsed['some_data'] = 'May 11 16:23:50 sd-126005 kernel: [47615902.763137] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=44.44.44.44 DST=127.0.0.1 LEN=60 TOS=0x00 PREC=0x00 TTL=49 ID=17451 DF PROTO=TCP SPT=53668 DPT=80 WINDOW=14600 RES=0x00 SYN URGP=0'  id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] + Processing 1 statics                        id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] .Parsed[is_my_service] = 'yes'                id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] Event leaving node : ok                       id=billowing-flower name=me/myparser stage=s01-parseDEBU[20-08-2021 17:18:21] move Event from stage s01-parse to s02-enrich  id=billowing-flower name=me/myparser stage=s01-parse...

We can see our "mock" parser is working, let's see what happened :

  • The event enter the node
  • The filter returned true (1 == 1) so the event will be processed
  • Our grok pattern (just a .* capture) "worked" and captured data (the whole line actually)
  • The grok captures (under the name some_data) are merged into the .Parsed map of the event
  • The statics section is processed, and .Parsed[is_my_service] is set to yes
  • The event leaves the parser successfully, and because onsuccess is set to next_stage, the event moves to the next stage

Writing the GROK pattern#

We are going to write a parser for iptables logs, they look like this :

May 11 16:23:43 sd-126005 kernel: [47615895.771900] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=99.99.99.99 DST=127.0.0.1 LEN=40 TOS=0x00 PREC=0x00 TTL=245 ID=51006 PROTO=TCP SPT=45225 DPT=8888 WINDOW=1024 RES=0x00 SYN URGP=0 May 11 16:23:50 sd-126005 kernel: [47615902.763137] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=44.44.44.44 DST=127.0.0.1 LEN=60 TOS=0x00 PREC=0x00 TTL=49 ID=17451 DF PROTO=TCP SPT=53668 DPT=80 WINDOW=14600 RES=0x00 SYN URGP=0 

Using an online grok debugger or an online regex debugger, we come up with the following grok pattern :

\[%{DATA}\]+.*(%{WORD:action})? IN=%{WORD:int_eth} OUT= MAC=%{IP}:%{MAC} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{INT:length}.*PROTO=%{WORD:proto} SPT=%{INT:src_port} DPT=%{INT:dst_port}.*
warning

Check if the pattern you are looking for is not already present in patterns configuration.

Test our new pattern#

Now, let's integrate our GROK pattern within our YAML :

#let's set onsuccess to "next_stage" : if the log is parsed, we can consider it has been dealt withonsuccess: next_stage#debug, for reasons (don't do this in production)debug: true#as seen in our sample log, those logs are processed by the system and have a progname set to 'kernel'filter: "1 == 1"#name and description:name: crowdsecurity/iptables-logsdescription: "Parse iptables drop logs"grok:#our grok pattern  pattern: \[%{DATA}\]+.*(%{WORD:action})? IN=%{WORD:int_eth} OUT= MAC=%{IP}:%{MAC} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{INT:length}.*PROTO=%{WORD:proto} SPT=%{INT:src_port} DPT=%{INT:dst_port}.*#the field to which we apply the grok pattern : the log message itself  apply_on: messagestatics:  - parsed: is_my_service    value: yes
./crowdsec -c ./dev.yaml -dsn file://x.log -type foobar
Expected output
INFO[20-08-2021 17:47:46] reading x.log at once                         type="file://x.log"DEBU[20-08-2021 17:47:46] + Grok '[%{D...' returned 8 entries to merge in Parsed  id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46]   .Parsed['proto'] = 'TCP'                     id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46]   .Parsed['src_port'] = '45225'                id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46]   .Parsed['dst_port'] = '8888'                 id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46]   .Parsed['action'] = ''                       id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46]   .Parsed['int_eth'] = 'enp1s0'                id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46]   .Parsed['src_ip'] = '99.99.99.99'            id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46]   .Parsed['dst_ip'] = '127.0.0.1'              id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46]   .Parsed['length'] = '40'                     id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] + Processing 1 statics                        id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed[is_my_service] = 'yes'                id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] Event leaving node : ok                       id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] move Event from stage s01-parse to s02-enrich  id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseWARN[20-08-2021 17:47:46] Acquisition is finished, shutting down       DEBU[20-08-2021 17:47:46] + Grok '\[%{D...' returned 8 entries to merge in Parsed  id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46]   .Parsed['length'] = '60'                     id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46]   .Parsed['proto'] = 'TCP'                     id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46]   .Parsed['src_port'] = '53668'                id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46]   .Parsed['dst_port'] = '80'                   id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46]   .Parsed['action'] = ''                       id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46]   .Parsed['int_eth'] = 'enp1s0'                id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46]   .Parsed['src_ip'] = '44.44.44.44'            id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46]   .Parsed['dst_ip'] = '127.0.0.1'              id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] + Processing 1 statics                        id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] .Parsed[is_my_service] = 'yes'                id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] Event leaving node : ok                       id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:47:46] move Event from stage s01-parse to s02-enrich  id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse...

What changed ? We can now see that the fragment captured by the GROK pattern are merged in the Parsed array ! We now have parsed data, only a few more changes and we will be done :)

Finalizing our parser#

#let's set onsuccess to "next_stage" : if the log is parsed, we can consider it has been dealt withonsuccess: next_stage#debug, for reasons (don't do this in production)debug: true#as seen in our sample log, those logs are processed by the system and have a progname set to 'kernel'filter: "evt.Parsed.program == 'kernel'"#name and description:name: crowdsecurity/iptables-logsdescription: "Parse iptables drop logs"grok:#our grok pattern  pattern: \[%{DATA}\]+.*(%{WORD:action})? IN=%{WORD:int_eth} OUT= MAC=%{IP}:%{MAC} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{INT:length}.*PROTO=%{WORD:proto} SPT=%{INT:src_port} DPT=%{INT:dst_port}.*#the field to which we apply the grok pattern : the log message itself  apply_on: messagestatics:    - meta: log_type      value: iptables_drop    - meta: service      expression: "evt.Parsed.proto == 'TCP' ? 'tcp' : 'unknown'"    - meta: source_ip      expression: "evt.Parsed.src_ip"

filter#

We changed the filter to correctly filter on the program name. In the current example, our logs are produced by the kernel (netfilter), and thus the program is kernel :

tail -f /var/log/kern.logMay 11 16:23:50 sd-126005 kernel: [47615902.763137] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=44.44.44.44 DST=127.0.0.1 LEN=60 TOS=0x00 PREC=0x00 TTL=49 ID=17451 DF PROTO=TCP SPT=53668 DPT=80 WINDOW=14600 RES=0x00 SYN URGP=0 

statics#

We are setting various entries to static or dynamic values to give "context" to the log :

  • .Meta.log_type is set to iptables_drop (so that we later can filter events coming from this)
  • .Meta.source_ip is set the the source ip captured .Parsed.src_ip
  • .Meta.service is set the the result of an expression that relies on the GROK output (proto field)

Look into dedicated statics documentation to know more about its possibilities.

Testing our finalized parser#

./crowdsec -c ./dev.yaml -dsn file://x.log -type kernel
Expected output
...INFO[20-08-2021 17:49:02] reading x.log at once                         type="file://x.log"DEBU[20-08-2021 17:49:02] eval(evt.Parsed.program == 'kernel') = TRUE   id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] eval variables:                               id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02]        evt.Parsed.program = 'kernel'          id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] + Grok '[%{D...' returned 8 entries to merge in Parsed  id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02]   .Parsed['proto'] = 'TCP'                     id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02]   .Parsed['src_port'] = '45225'                id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02]   .Parsed['dst_port'] = '8888'                 id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02]   .Parsed['action'] = ''                       id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02]   .Parsed['int_eth'] = 'enp1s0'                id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02]   .Parsed['src_ip'] = '99.99.99.99'            id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02]   .Parsed['dst_ip'] = '127.0.0.1'              id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02]   .Parsed['length'] = '40'                     id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] + Processing 3 statics                        id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Meta[log_type] = 'iptables_drop'             id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Meta[service] = 'tcp'                        id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Meta[source_ip] = '99.99.99.99'              id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] Event leaving node : ok                       id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] move Event from stage s01-parse to s02-enrich  id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseWARN[20-08-2021 17:49:02] Acquisition is finished, shutting down       DEBU[20-08-2021 17:49:02] eval(evt.Parsed.program == 'kernel') = TRUE   id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] eval variables:                               id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02]        evt.Parsed.program = 'kernel'          id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] + Grok '\[%{D...' returned 8 entries to merge in Parsed  id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02]   .Parsed['src_ip'] = '44.44.44.44'            id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02]   .Parsed['dst_ip'] = '127.0.0.1'              id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02]   .Parsed['length'] = '60'                     id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02]   .Parsed['proto'] = 'TCP'                     id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02]   .Parsed['src_port'] = '53668'                id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02]   .Parsed['dst_port'] = '80'                   id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02]   .Parsed['action'] = ''                       id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02]   .Parsed['int_eth'] = 'enp1s0'                id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] + Processing 3 statics                        id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Meta[log_type] = 'iptables_drop'             id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Meta[service] = 'tcp'                        id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] .Meta[source_ip] = '44.44.44.44'              id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] Event leaving node : ok                       id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parseDEBU[20-08-2021 17:49:02] move Event from stage s01-parse to s02-enrich  id=withered-sun name=crowdsecurity/iptables-logs stage=s01-parse...

Closing word#

We have now a fully functional parser for iptables logs ! We can either deploy it to our production systems to do stuff, or even better, contribute to the hub !

If you want to know more about directives and possibilities, take a look at the parser reference documentation !