Split filebeat log with a variable length and create multiple events group by a field

Hi.
I'm trying to parse the information from a log line with this format.

2021 Mar 30 00:45:01;1617075901;user1,gw11,0;user1,gw22,5;user2,gw33,2;user2;gw43,3

I'm using dissect to format the first part of the log, with this:

dissect {
  mapping => {"message" => "%{fecha};%{timestamp};%{data}"}
}

the "data" field could be variable...

2021 Mar 30 00:45:01;1617075901;user1,gw11,0;user1,gw22,5;user2,gw33,2;user2;gw43,3
or
2021 Mar 30 00:46:01;1617075961;user1,gw11,3;user1,gw22,5
or
2021 Mar 30 00:47:01;1617076021;user1,gw11,5;user1,gw22,5;user3,gw33,2;user3;gw43,3;user4,gw44,4;user4,gw55,3
and so on...

What i need is to transform this log into multiple events, in particular an event per "User"
I'm struggling to parse the "data" and add the "integer" value indicated as a summ per user.

For example, a log line like this:
2021 Mar 30 00:46:01;1617075961;user1,gw11,3;user1,gw22,5

Needs to be an event like this:

event 1
{
        "_index": "index1",
        "@timestamp": "2021 Mar 30 00:47:01"
        "_type": "doc",
         "user": "user1"
        "gateways": [
	  {
		"name": "gw11"
		"gwcalls": 3
	  }
	  {
		"name": "gw22"
		"gwcalls": 5
	  }
     ]
        "totalcalls":8
 }

And and log line like this:
2021 Mar 30 00:47:01;1617076021;user1,gw11,5;user1,gw22,5;user3,gw33,2;user3;gw43,3;user4,gw44,4;user4,gw55,3

Need to be transform into 3 events like this

 event 1
{
        "_index": "index1",
        "@timestamp": "2021 Mar 30 00:47:01"
        "_type": "doc",
         "user": "user1"
        "gateways": [
	  {
		"name": "gw11"
		"gwcalls": 5
	  }
	  {
		"name": "gw22"
		"gwcalls": 5
	  }
    ]
        "totalcalls":10
 }


event 2
{
        "_index": "index1",
        "@timestamp": "2021 Mar 30 00:47:01"
        "_type": "doc",
         "user": "user3"
        "gateways": [
	  {
		"name": "gw22"
		"gwcalls": 5
	  }
	  {
		"name": "gw33"
		"gwcalls": 2
	  }
    ]
        "totalcalls":7
 }

event 3
{
        "_index": "index1",
        "@timestamp": "2021 Mar 30 00:47:01"
        "_type": "doc",
         "user": "user4"
        "gateways": [
	  {
		"name": "gw44"
		"gwcalls": 4
	  }
	  {
		"name": "gw55"
		"gwcalls": 3
	  }
    ]
        "totalcalls":7
 }

Is possible?. Could someone point me in the right direction?
Thanks!

Is user2;gw43,3 a typo? I ask because the third line has the same ;/, issue at the same position.

If so, the following code will work, if not you will have to adjust the regexp.

    dissect { mapping => { "message" => "%{fecha};%{timestamp};%{data}" } }
    ruby {
        code => '
            matches = event.get("data").scan(/(\w+),(\w+),(\w+)($|;)/)
            # Matches is an array of arrays like: [["user1", "gw11", "3", ";"], ["user1", "gw22", "5", ""]]
            users = {}

            # For each array of four matches...
            matches.each { |x|
                users[x[0]] ||= {}
                users[x[0]]["gateways"] ||= []

                users[x[0]]["user"] = x[0]
                # gateways is an array of hashes
                users[x[0]]["gateways"] << { "name" => x[1], "gwcalls" => x[2].to_i }
            }
            # If users is a hash { "key" => "value" } this converts it to [ "key", "value" ]
            users = users.to_a
            # Next we throw away the keys (usernames) since those are also inside the value hash
            newUsers = []
            users.each { |x|
                newUsers << x[1]
            }
            event.set("users", newUsers)
        '
    }
    split { field => "users" }
    ruby {
        code => '
            # Move contents of the [users] field to the top level
            event.get("users").each { |k, v|
                event.set(k, v)
            }
            event.remove("users")

            # Sum up the calls
            totalcalls = 0
            event.get("gateways").each { |x|
                totalcalls += x["gwcalls"]
            }
            event.set("totalcalls", totalcalls)
        '
    }

(/(\w+)[[:punct:]](\w+)[[:punct:]](\w+)($|;)/ would work...

Yes... is s typo.... sorry for that... a correct log file line should be :

2021 Mar 30 00:45:01;1617075901;user1,gw11,0;user1,gw22,5;user2,gw33,2;user2,gw43,3
or
2021 Mar 30 00:46:01;1617075961;user1,gw11,3;user1,gw22,5
or
2021 Mar 30 00:47:01;1617076021;user1,gw11,5;user1,gw22,5;user3,gw33,2;user3,gw43,3;user4,gw44,4;user4,gw55,3
and so on...

OK, so the code as written should work.

Almost there!!.. thanks @Badger

For a "data" like this :
usuario1,gate1.usuario1.01,26;usuario1,gate2.usuario1.02,1;user2,gw.01,12;user2,gw.02,7
I'm getting this into logstash:

{
  "_index": "calls",
  "_type": "_doc",
  "_id": "Wiv5sngBtkTNQGR8W8JZ",
  "_version": 1,
  "_score": null,
  "_source": {
    "input": {},
    "@version": "1",
    "timestamp": "1617910381",
    "gateways": [
      {
        "name": "01",
        "gwcalls": 12
      },
      {
        "name": "02",
        "gwcalls": 7
      }
    ],
    "data": "usuario1,gate1.usuario1.01,26;usuario1,gate2.usuario1.02,1;user2,gw.01,12;user2,gw.02,7",
    "host": {
      "name": "eksdata"
    },
    "ecs": {},
    "message": "2021 Apr 8 15:33:01;1617910381;usuario1,gate1.usuario1.01,26;usuario1,gate2.usuario1.02,1;user2,gw.01,12;user2,gw.02,7",
    "fields": {
      "type": "outbound-activecalls"
    },
    "user": "gw",
    "totalcalls": 19,
    "@fecha_exe": "2021-04-08T19:33:01.000Z",
    "agent": {
      "name": "eksdata"
    },
    "tags": [
      "beats_input_codec_plain_applied"
    ],
    "log": {
      "file": {
        "path": "/var/log/metricas/outbound.activecalls"
      }
    },
    "@timestamp": "2021-04-08T19:33:07.342Z",
    "fecha": "2021 Apr 8 15:33:01"
  },
  "fields": {
    "@timestamp": [
      "2021-04-08T19:33:07.342Z"
    ],
    "@fecha_exe": [
      "2021-04-08T19:33:01.000Z"
    ]
  },
  "sort": [
    1617910381000
  ]
}

Maybe the problem is in the [[:punct:]] ?
And in the array "gateways" i can't get the name... just the number.....

I tried with the first code :
matches = event.get("data").scan(/(\w+),(\w+),(\w+)($|;)/)

But i didn't get logstash to process the filebeat data....

Ricardo

That does not match \w+

The solution might be [\w.]+ or it might be more complicated than that depending on what characters can occur in that field.

Indeed... that make it work..
so the matched section is now:
matches = event.get("data").scan(/([\w.]+)[[:punct:]]([\w.]+)[[:punct:]]([\w.]+)($|;)/)

Thank you @Badger.
One more question (to anyone can answer)... maybe is too basic.. but i'm still learning.. so please don't be so hard with me...
As you can probably guess i'm collecting the information from a "SIP" device making and receiving SIP calls...
So i'm trying to aggregate the information as "per user" and "per outbound or inboud calls". With the totalcalls,user and fields.type fields i can have all the information i need. But i want to store the calls per gateways in case i need to disaggregate the totalcalls as per gateways. I thought with the scheme asked before:

{
        "_index": "index1",
        "@timestamp": "2021 Mar 30 00:47:01"
        "_type": "doc",
         "user": "user1"
        "gateways": [
	  {
		"name": "gw11"
		"gwcalls": 3
	  }
	  {
		"name": "gw22"
		"gwcalls": 5
	  }
     ]
        "totalcalls":8
 }

but when i try to see the information as per gateway in the "discover" tab from kibana it shows a "question" mark, as indicated in the image

Is that ok???...
I was thiking that maybe i could access the data in the discover tab with something like
gateways.name field or something like that...

Maybe that's not the best way to store the data collected for a user gateways....

So... can i have suggestions on how to store the gateway information?.
I was thinking that maybe is better to have something like this:

{
        "_index": "index1",
        "@timestamp": "2021 Mar 30 00:47:01"
        "_type": "doc",
         "user": "user1"
         "gateway-1": {
		"name": "gw11"
		"gwcalls": 3
	      }  
	    "gateway-2" {
		"name": "gw22"
		"gwcalls": 5
	     }
        "totalcalls":8
 }

How much changes the script to collect the data?

Thanks!

That . will match any character. What I meant was to match the period, but I didn't notice that the forum ate my backslash. Try ([\w\.]+)

Immediately before event.set you could add this code to reformat the gateways

newUsers.each { |x|
    x["gateways"].each_index { |y|
        x["gateway-#{y+1}"] = x["gateways"][y]
    }
    x.delete("gateways")
}

That has not been tested. If it does not work let me know and I will take a look in about 15 hours.

Thanks @Badger
I will try this tomorrow.... i will let you know!
Thanks!

Hi @Badger
I added the code, but nothing seems to change.....

    ruby {
      code => '
        # matches = event.get("data").scan(/(\w+),(\w+),(\w+)($|;)/)
        matches = event.get("data").scan(/([\w\.]+)[[:punct:]]([\w\.]+)[[:punct:]]([\w\.]+)($|;)/)
        # Matches is an array of arrays like: [["user1", "gw11", "3", ";"], ["user1", "gw22", "5", ""]]
        users = {}

        # For each array of four matches...
        matches.each { |x|
            users[x[0]] ||= {}
            users[x[0]]["gateways"] ||= []

            users[x[0]]["user"] = x[0]
            # gateways is an array of hashes
            users[x[0]]["gateways"] << { "name" => x[1], "gwcalls" => x[2].to_i }
        }
        # If users is a hash { "key" => "value" } this converts it to [ "key", "value" ]
        users = users.to_a
        # Next we throw away the keys (usernames) since those are also inside the value hash
        newUsers = []
        users.each { |x|
            newUsers << x[1]
        }
        event.set("users", newUsers)

==== added here ====

        newUsers.each { |x|
            x["gateways"].each_index { |y|
                    x["gateway-#{y+1}"] = x["gateways"][y]
            }
            x.delete("gateways")
        }
    '
    }

==================

    split { field => "users" }

    ruby {
      code => '
        # Move contents of the [users] field to the top level
        event.get("users").each { |k, v|
            event.set(k, v)
        }
        event.remove("users")

        # Sum up the calls
        totalcalls = 0
        event.get("gateways").each { |x|
            totalcalls += x["gwcalls"]
        }
        event.set("totalcalls", totalcalls)
    '
    }

But nothing seems to change in the output...

{
  "_index": "calls",
  "_type": "_doc",
  "_id": "ICtGt3gBtkTNQGR8vdWT",
  "_version": 1,
  "_score": null,
  "_source": {
    "user": "usuario1",
    "@timestamp": "2021-04-09T15:36:07.036Z",
    "input": {},
    "totalcalls": 9,
    "@fecha_exe": "2021-04-09T15:36:01.000Z",
    "log": {
      "file": {
        "path": "/var/log/metricas/inbound.activecalls"
      }
    },
    "timestamp": "1617982561",
    "fields": {
      "type": "inbound-activecalls"
    },
    "gateways": [
      {
        "name": "gate1.usuario1.01",
        "gwcalls": 4
      },
      {
        "name": "gate2.usuario1.02",
        "gwcalls": 5
      }
    ],
    "host": {
      "name": "eksdata"
    },
    "message": "2021 Apr 9 11:36:01;1617982561;usuario1,gate1.usuario1.01,4;usuario1,gate2.usuario1.02,5;user2,gw.01,1;user2,gw.02,7",
    "tags": [
      "beats_input_codec_plain_applied"
    ],
    "data": "usuario1,gate1.usuario1.01,4;usuario1,gate2.usuario1.02,5;user2,gw.01,1;user2,gw.02,7",
    "fecha": "2021 Apr 9 11:36:01",
    "ecs": {},
    "agent": {
      "name": "eksdata"
    },
    "@version": "1"
  },
  "fields": {
    "@timestamp": [
      "2021-04-09T15:36:07.036Z"
    ],
    "@fecha_exe": [
      "2021-04-09T15:36:01.000Z"
    ]
  },
  "sort": [
    1617982561000
  ]
}

Ricardo.-

You have to add it before the event.set call.

Sorry for that...!
I corrected that and now i'm having the new fields in the kibana-discover tab

But the field totalcalls , in the second ruby code , is not adding the gwcalls.
I'm guessing that's because the field "gateways" does not exist... instead we have now gateway-1, gateway-2 .. etc... so ... is possible to use something like this instead?

    # Sum up the calls
    totalcalls = 0
    event.get("gateways-*").each { |x|
        totalcalls += x["gwcalls"]
    }
    event.set("totalcalls", totalcalls)

Regards,
Ricardo

You could change it to do the rollup as it goes. Instead of

            users[x[0]] ||= {}
            users[x[0]]["gateways"] ||= []

            users[x[0]]["user"] = x[0]
            # gateways is an array of hashes
            users[x[0]]["gateways"] << { "name" => x[1], "gwcalls" => x[2].to_i }

try

            users[x[0]] ||= {}
            users[x[0]]["gateways"] ||= []
            users[x[0]]["totalcalls"] ||= 0

            users[x[0]]["user"] = x[0]
            # gateways is an array of hashes
            users[x[0]]["gateways"] << { "name" => x[1], "gwcalls" => x[2].to_i }
            users[x[0]]["totalcalls"] += x[2].to_i

That worked perfect !!
Thanks @Badger for all your help!.
In case anyone in the group needs something similar... this is the final code:

    ruby {
      code => '
        matches = event.get("data").scan(/([\w\.]+)[[:punct:]]([\w\.]+)[[:punct:]]([\w\.]+)($|;)/)
        # Matches is an array of arrays like: [["user1", "gw11", "3", ";"], ["user1", "gw22", "5", ""]]
        users = {}

        # For each array of four matches...
        matches.each { |x|
            users[x[0]] ||= {}
            users[x[0]]["gateways"] ||= []
            users[x[0]]["totalcalls"] ||= 0

            users[x[0]]["user"] = x[0]
            # gateways is an array of hashes
            users[x[0]]["gateways"] << { "name" => x[1], "gwcalls" => x[2].to_i }
            users[x[0]]["totalcalls"] += x[2].to_i
        }
        # If users is a hash { "key" => "value" } this converts it to [ "key", "value" ]
        users = users.to_a
        # Next we throw away the keys (usernames) since those are also inside the value hash
        newUsers = []
        users.each { |x|
            newUsers << x[1]
        }

        newUsers.each { |x|
            x["gateways"].each_index { |y|
                    x["gateway-#{y+1}"] = x["gateways"][y]
            }
            x.delete("gateways")
        }

        event.set("users", newUsers)
    '
    }

    split { field => "users" }

    ruby {
      code => '
        # Move contents of the [users] field to the top level
        event.get("users").each { |k, v|
            event.set(k, v)
        }
        event.remove("users")
    '
    }

Best Regards,
Ricardo