Split filebeat log with a variable length into several new events

Hi.
I'm fairly new with ELK, and i been struggling a lot trying to make this work, with no luck, so i'm asking for a way to solve this...
I'm using Filebeat to get logs into my elasticsearc server and, the line logs are like this:

2021 Mar 30 00:45:01;1617075901;user1,gw11,0;user1,gw22,5;user2,gw33,2;user2;gw43,3
or
2021 Mar 30 00:46:01;1617075961;user1,gw11,3;user1,gw22,5
or
2021 Mar 30 00:47:01;1617076021;user1,gw11,5;user1,gw22,5;user3,gw33,2;user3;gw43,3;user4,gw44,4;user4,gw55,3
and so on...

So, generally speaking, the line log is something like:
date;timestamp;user,gateway,number

So far i was able to parse the first two fields with something like this:

input ...
filter {
        dissect {
          mapping => {"message" => "%{fecha};%{timestamp};%{data}"}
        }

        date {
                match => ["timestamp", "UNIX"]
                target => "@fecha_exe"
        }
output....

So i have the %fecha, and the %timestamp fields parsed ok from the message...
The problem is the field %{data}. The field "%data" could be variable... and i need to parse it according the number of fields separated by the ";" and create a new event for each one of them...

So i would like to turn an input like this:
2021 Mar 30 00:45:01;1617075901;user1,gw11,0;user1,gw22,5;user2,gw33,2;user2;gw43,3

into four events like this:

{
        "_index": "index1",
        "@timestamp": "2021 Mar 30 00:45:01"
        "_type": "doc",
         "user": "user1"
        "gateway": "gw11"
        "value":0
 }

{
        "_index": "index1",
        "@timestamp": "2021 Mar 30 00:45:01"
        "_type": "doc",
         "user": "user1"
        "gateway": "gw22"
        "value":5
 }

{
        "_index": "index1",
        "@timestamp": "2021 Mar 30 00:45:01"
        "_type": "doc",
         "user": "user2"
        "gateway": "gw33"
        "value":2
 }

{
        "_index": "index1",
        "@timestamp": "2021 Mar 30 00:45:01"
        "_type": "doc",
         "user": "user2"
        "gateway": "gw43"
        "value":3
 }

or something like that...
Any ideas? Is this possible?
Thanks!

Use mutate+split to convert [data] into an array, then use a split filter to convert the array into multiple events, then use dissect to pick out the three parts of the [data].

Hi @Badger
Thanks for the TIP!... i was finally able to have like i wanted to !.
In case someone needs the same result.. this is how my logstash filter config file looks like:

filter {
        dissect {
          mapping => {"message" => "%{fecha};%{timestamp};%{data}"}
        }

        date {
          match => ["timestamp", "UNIX"]
          target => "@fecha_exe"
        }

        mutate {
          split => ["data", ";"]
        }

        split {
          field => "data"
        }

        dissect {
          mapping => {"data" => "%{usuario},%{gateway},%{activecalls}"}
        }

        mutate {
          convert => {
                "usuario" => "string"
                "gateway" => "string"
                "activecalls" => "integer"
          }
        }

Thanks again!
Regards!
Ricardo

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.