Split filebeat log with a variable length into several new events

I'm fairly new with ELK, and i been struggling a lot trying to make this work, with no luck, so i'm asking for a way to solve this...
I'm using Filebeat to get logs into my elasticsearc server and, the line logs are like this:

2021 Mar 30 00:45:01;1617075901;user1,gw11,0;user1,gw22,5;user2,gw33,2;user2;gw43,3
2021 Mar 30 00:46:01;1617075961;user1,gw11,3;user1,gw22,5
2021 Mar 30 00:47:01;1617076021;user1,gw11,5;user1,gw22,5;user3,gw33,2;user3;gw43,3;user4,gw44,4;user4,gw55,3
and so on...

So, generally speaking, the line log is something like:

So far i was able to parse the first two fields with something like this:

input ...
filter {
        dissect {
          mapping => {"message" => "%{fecha};%{timestamp};%{data}"}

        date {
                match => ["timestamp", "UNIX"]
                target => "@fecha_exe"

So i have the %fecha, and the %timestamp fields parsed ok from the message...
The problem is the field %{data}. The field "%data" could be variable... and i need to parse it according the number of fields separated by the ";" and create a new event for each one of them...

So i would like to turn an input like this:
2021 Mar 30 00:45:01;1617075901;user1,gw11,0;user1,gw22,5;user2,gw33,2;user2;gw43,3

into four events like this:

        "_index": "index1",
        "@timestamp": "2021 Mar 30 00:45:01"
        "_type": "doc",
         "user": "user1"
        "gateway": "gw11"

        "_index": "index1",
        "@timestamp": "2021 Mar 30 00:45:01"
        "_type": "doc",
         "user": "user1"
        "gateway": "gw22"

        "_index": "index1",
        "@timestamp": "2021 Mar 30 00:45:01"
        "_type": "doc",
         "user": "user2"
        "gateway": "gw33"

        "_index": "index1",
        "@timestamp": "2021 Mar 30 00:45:01"
        "_type": "doc",
         "user": "user2"
        "gateway": "gw43"

or something like that...
Any ideas? Is this possible?

Use mutate+split to convert [data] into an array, then use a split filter to convert the array into multiple events, then use dissect to pick out the three parts of the [data].

Hi @Badger
Thanks for the TIP!... i was finally able to have like i wanted to !.
In case someone needs the same result.. this is how my logstash filter config file looks like:

filter {
        dissect {
          mapping => {"message" => "%{fecha};%{timestamp};%{data}"}

        date {
          match => ["timestamp", "UNIX"]
          target => "@fecha_exe"

        mutate {
          split => ["data", ";"]

        split {
          field => "data"

        dissect {
          mapping => {"data" => "%{usuario},%{gateway},%{activecalls}"}

        mutate {
          convert => {
                "usuario" => "string"
                "gateway" => "string"
                "activecalls" => "integer"

Thanks again!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.