Logstash pipeline filter

Hi,
I am new to ELK and struggling to write my first logstash pipeline.
Can anyone help me to write the filter section?
Thanks in advance.

i want to filter and only output the maximum of completion per users

for instance :
user completion
u2 20
...
u48 100

that is my trial whitout succeed.

input
{elasticsearch
{hosts => "...."
user => "..."
password => "..."
index => "index1"
codec =>"json"
docinfo => true
}}



filter {
aggregate {
task_id => "%{users}"
code => "map['completion'] = event.get('completion') ;
event.cancel if (map['completion']) != map['completion'].max()"
map_action => "create" }



}output
{elasticsearch
{hosts => "..."
user => "..."
password => "..."
index => "index2"
document_type =>"%{[@metadata][_type]}"
document_id =>"%{[@metadata][_id]}"
}}

can someone helps me please, thank you!

Hi,

What is the reason you use logstash?
If the input and the output is on the same cluster, Transform may be a good option.

you can aggregate data and put into another destination index periodically/continously.

thank you @Tomo_M ,
i have several clusters on which i installed Elasticsearch

then, how about use query in the input and aggregate data without aggregation filter, which is more efficient and you don't have to reinvent the wheel about max aggregation.

I suppose map['completion'] is a value and not compatible with max() function. And, your first line map['completion'] = event.get('completion') will update map['completion'] on every event even if event.get('completion') is less than map['completion'].

yes not compatible with max() function , i got an non define error function or method... , i dont master very well my lines of code , i wanted to have a maximum completion value of every user,
i was thinking first of all must create a map , and then find the maximum value for completion...

that strategy seems to have some problems because logstash run the code event by event.
logstash can't detect which is the last event for that specific user. map['completion'] can only keep Array or Value, not both.

I haven't tried it, but how about the following

event.cancel if !(event.get('completion'));
map['completion'] ||= event.get('completion');
map['completion'] = [map['completion'], event.get('completion')].max;

But I strongly recommend to

  • use aggregation query in logstash input or
  • use aggregation transform in source cluster, and use logstash to simply extract and update.
event.cancel if !(event.get('completion'));
map['completion'] ||= event.get('completion');
map['completion'] = [map['completion'], event.get('completion')].max;

thank you @Tomo_M ,
since it gives a result whithout error , the output is not what was expected ...
return for each user, the line of higher completion.
i've been struggle with it since 2 weeks

I had to drop the events.

input{
  elasticsearch{
    hosts => "localhost:9200"
    index => ""
    user => ""
    password => ""
    codec =>"json"
    docinfo => true
    schedule => "* * * * *"
  }
}
filter {
  aggregate {
    task_id => "%{users}"
    code => "event.cancel if !(event.get('completion'));
    map['completion'] ||= event.get('completion');
    map['completion'] = [map['completion'], event.get('completion')].max;
    event.cancel"
    push_map_as_event_on_timeout => true
    timeout=>1
  }
  if !([completion]) {drop{}}
}
output {
  stdout { codec => rubydebug }
}