Logstash Pipeline Filter (*)

RE:
Hi,I am struggling to write a logstash pipeline filter section

i have used "TRANSFORMS" from kibana interface, it did not solve the case.

*** i want to filter and only output event that correspond to the maximum of completion per users ***

for instance :


n°   |  user |   completion

05   |  u2   |     20
 ... |  ...  |    ...
423  |  u48  |    100
424  | u49   |     -
425  | u50   |     0

here my trial:

input
{elasticsearch
{hosts => "...."
user => "..."
password => "..."
index => "index1"
codec =>"json"
docinfo => true
}}



filter {
aggregate {
task_id => "%{users}"
code => "map['completion'] = event.get('completion') ;
event.cancel if (map['completion']) != map['completion'].max()"
map_action => "create" }
}



output
{elasticsearch
{hosts => "..."
user => "..."
password => "..."
index => "index2"
document_type =>"%{[@metadata][_type]}"
document_id =>"%{[@metadata][_id]}"
}}

Thanks in advance.

That requires logstash to look into the future and predict what events will occur after the current event. Tricky.

I think the best you can do is to use push_map_as_event_on_timeout.

aggregate {
    task_id => "%{users}"
    code => '
        map["completion"] ||= 0
        c = event.get("completion")
        if c > map["completion"]
            map["completion"] = c
        end
        event.cancel
    '
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "users"
    timeout => 600 # 10 minutes timeout
}

Note that the event that is created will only contain the fields that you add to the map, so in this case it will have [completion] and [users] (because timeout_task_id_field is set). If you have other fields you want to preserve then add them to the map.

thank you @Badger ,
the output is actualy something like this:

users     |     completion
   -      |      number
   -      |      number
   u49    |       -
   -      |      number

users field is no longer outputed ,therefore we can not know for which users the field completion correspond.

If the [users] field is only output when it changes then you could use a ruby filter to add it back in. You need order preserved, so make sure that pipeline.workers is 1 and pipeline.ordered evaluates to true.

ruby {
    init => '@user = nil'
    code => '
        user = event.get("users")
        if user
            @user = user
        else
            event.set("users", @user)
        end
    '
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.