Aggregate fields based on nested filter with custom separator

I would like to aggregate some of the fields based on other nested fields.
Here is how my data looks like:

{
          "place" => "abc",
   "m_id" => 5099,
  "address" => "Kurt-Schumacher-Ring 15-17",
         "name" => "delta ",
     "@version" => "1",
   "v_id" => 4540,
     "s_id" => 12560,
          "plz" => "12345"
}
{
          "place" => "cdf",
   "m_id" => 5099,
  "address" => "Kurt-Schumacher-Ring 15-17",
         "name" => "delta",
     "@version" => "1",
   "v_id" => 4540,
     "s_id" => 12560,
          "plz" => "12345"
}
{
          "place" => "cbc",
   "m_id" => 529,
  "address" => "Kurt-Schumacher-Ring 15-17",
         "name" => "delta",
     "@version" => "1",
   "v_id" => 165,
     "s_id" => 12568,
          "plz" => "69000"
 }
{
          "ort" => "vbf",
   "m_id" => 529,
  "address" => "Kurt-Schumacher-Ring 15-17",
         "name" => "delta ",
     "@version" => "1",
   "v_id" => 165,
     "s_id" => 12568,
          "plz" => "60000"
}

{
          "ort" => "xyz",
   "m_id" => 529,
  "address" => "Kurt-Schumacher-Ring 15-17",
         "name" => "delta ",
     "@version" => "1",
   "v_id" => 165,
     "s_id" => 12568,
          "plz" => "69999"
}

Now i want to aggregate them based on nested id [s_id][v_id][m_id] or [s_id][v_id] or [s_id][m_id]

In the new output I want to create a new field "all" which should have
place | plz address @ place | plz address

so for given input, following output

      "all" => "abc | 12345 Kurt-Schumacher-Ring 15-17 @ cdf | 12345 Kurt-Schumacher-Ring 15-17

"m_id" => 5099,
"name" => "delta ",
"@version" => "1",
"v_id" => 4540,
"s_id" => 12560,
"plz" => "12345"

   "all" => "cbc | 69000 Kurt-Schumacher-Ring 15-17 @ vbf | 60000 Kurt-Schumacher-Ring 15-17 @ xyz |   69999 Kurt-Schumacher-Ring"

"m_id" => 529,
"name" => "delta ",
"@version" => "1",
"v_id" => 165,
"s_id" => 12568,

How can I do this, any pointers??

@fbaligand my new issue.

My attempt to solve the problem so far:

filter           {
aggregate {

  task_id => "%{[s_id][v_id]}"
  code => "
  map['all'] ||= 0; 
  map['all']=event['place']
  map['all'] = event ['plz']
  map['all']= event['address']
  "
 #  map_action => "create"
  push_previous_map_as_event => true
  timeout => 5

  }
}

Doesnt work so far..

@magnusbaeck your input??

Which Logstash version do you use ?
Looking to your input data, you have no nested field, right ?
What is the input ? Jdbc ?

logstash version: logstash 5.5.1. Yeah this is jdbc input. I have nested fields, but, this aggregation is on parent data itself. So, for this particular case, no nested fields.

I just want to combine the fields into new field using custom seperators here. Is aggregate filter plugin is not the right way to go for it? What else can i use here @fbaligand

  • First, aggregate filter is the right plugin for your need, given you need to aggregate several events into one.
  • concerning task_id, syntax "%{[s_id][v_id]}" is for nested field "v_id" inside "s_id". You have not nested field. You want to make a composite task id based on 3 fields.
  • map['all'] should be initialized with an empty string, and not a 0 number, given you try to concatenate strings
  • syntax event['key'] doesn't work anymore since Logstash 5. It is replaced by event.get('key')
  • if you try to concatenate fields, you should use += operator and not = operator

So here's the right configuration :

aggregate {
    task_id => "%{s_id}%{v_id}%{m_id}"
  code => "
    map['all'] ||= ""
    map['all'] += " @ " unless map['all'].empty?
    map['all'] += event.sprintf('%{place} | %{plz} %{address}')
  "
  push_previous_map_as_event => true
  timeout => 5
}

i see! Now i understand it bit more. Thanks again!

dont know i m getting this error message, looked in hex editor too.

ERROR logstash.agent - Cannot create pipeline {:reason=>"Expected one of #, {, } at line 35, column 21 (byte 1649) after filter{\n    aggregate {\n   task_id => \"%{s_id}%{v_id}\"\n   code => \"\n    map['all'] ||= \""}

Ok, the problem is with the quotes. One has to use the single quotes instead of double. It works fine now.

Sorry for the double quotes.

It is ok, gave me chance to learn more Ruby, logstash syntax. Between, I have another issue with this problem coming up now. Using the code above i get output, which is bit different than what i would expect.

I modified the code little bit, but, I am not sure how to proceed further.

filter{
aggregate {
 task_id => "%{s_id}%{v_id}"
code => "
map['all'] ||= ''
map['all'] += ' @ ' unless map['all'].empty?
map['all'] += event.sprintf('%{place} | %{plz} %{address}')
 event.set('all', map['all'])
"
push_previous_map_as_event => true
timeout => 5
}

}

I added event.set line which creates a new field called all. This is what i want in the end. But, it is printing all combined values in the end after the aggregation, i dont want to have that.

{
 “m_id” => 5099,
“name” => “delta “,
”@version” => “1”,
“v_id” => 4540,
“s_id” => 12560,
“plz” => “12345”
"all" => "abc | 12345 Kurt-Schumacher-Ring 15-17 @ cdf | 12345 Kurt-Schumacher-Ring 15-17"
}
{
 “m_id” => 529,
“name” => “delta “,
”@version” => “1”,
“v_id” => 165,
“s_id” => 12568,
 "all" => "cbc | 69000 Kurt-Schumacher-Ring 15-17 @ vbf | 60000 Kurt-Schumacher-Ring 15-17 @ xyz |   69999 Kurt-Schumacher-Ring"
}

It gives this additional values along with above one, i dont want this....
{
"all" => "abc | 12345 Kurt-Schumacher-Ring 15-17 @ cdf | 12345 Kurt-Schumacher-Ring 15-17"
}
{
"all" => "cbc | 69000 Kurt-Schumacher-Ring 15-17 @ vbf | 60000 Kurt-Schumacher-Ring 15-17 @ xyz | 69999 Kurt-Schumacher-Ring"
}

How to get rid of this ones? Is there any better way to proceed.

Another question, aggregate filter will take care of not repeating the duplicated entries??

It seems you don't understand aggregate filter behaviour when push_previous_map_as_event => true option is set.
I really invite you to read aggregate documentation which is really detailled, especially example 4 :
https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html

The main idea is :

  • for a same entity id, jdbc input generates one event per join
  • aggregate filter will aggregate jdbc events for a same entity id into one new aggregated event created from map.

So it is normal, at the end, for each task id, to have one new event with only "all" field. Because you only set "all" field inside.

If I understand your need (I'm not sure), at the end, you want to have only one event per task_id, with aggregated all field.

If so, here's the right configuration up to me :

aggregate {
 task_id => "%{s_id}%{v_id}"
code => "
map.merge(event.to_hash) if map.empty?
map['all'] ||= ''
map['all'] += ' @ ' unless map['all'].empty?
map['all'] += event.sprintf('%{place} | %{plz} %{address}')
event.cancel()
"
push_previous_map_as_event => true
timeout => 5
}
  • aggregate filter will not remove intermediate events
  • furthermore, aggregate filter will do only what your code indicates.
  • so it won't automatically remove duplicates.
  • but the way to do that is to copy all you need from event to map, then cancel event (so it is deleted), and at the end, aggregate generates a new event from map with all aggregated data, one event per task_id. So task_id definition is the key to avoid duplicates.

Yeah, i understand it. I understand that the duplicates are taken care of, I just wanted to confirm it. As, i did not make a test on my data to make sure it is working well.

Tested the script now. The confusion happend as I was printing the output as stdout (rubydebug). I think it is clear to me now.

I have some additional queries, more of ruby. If you could help here.

I want to modify the output such as digit to %02d and Year to last 2 digits only. So, here is how my string looks like now.

map['date'] += event.sprintf('%{group1}-%{group2}-%02d%{under}-%{year}')

I have to edit highlighted text here. under is single digit number want to print it as 02D and year is 4 digit want to print its last 2 digit only. Similarly i want to do uppercase and lowecase for some fields.

All you can do with "sprintf" is explained here :
https://www.elastic.co/guide/en/logstash/5.5/event-dependent-configuration.html#sprintf

There is especially a link to detail all "time format" possibilities.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.