Aggregate logs without special pattern or "flag" for the last of the group

Might be linked with the discussion from Aggregate data based on some field:

Hi everyone,

I have incoming logs in Logsash looking like these:

id1 valueA, id1 valueB, id1 valueC, id1 valueD, id2 valueAA, id2 valueBB, id2 valueCC, id2 valueDD, id2 valueEE, id2 valueFF, id3 valueAAA, ....

  1. Logs come by group, with the same id, and once it's done, a new group begin and there's no risk to receive a new log from a previous group again.
  2. Each group can have any number of logs.
  3. There is no "flag" like "END" or "Complete" to indicate it's the last log of the group.

In the end, my goal is to obtain elasticsearch docs like this:
{ "group_id" : "id1", "values" : [valueA, valueB, valueC, valueD] }, { "group_id" : "id2", "values" : [valueAA, valueBB, valueCC, valueDD, valueEE, valueFF] }, { "group_id" : "id3", "values" : [valueAAA, ...] }, ...

How can I proceed?

I bet I have to use the aggregation plugin, but all examples I found in both the doc and this forum had a "END" flag in the last log to trigger the end_of_task => true.

I guess I have to store the value of the id somewhere, since I will only know that group 1 in done when group 2 enter the pipe?

I may append the values of group 1 in an array like this :
code => "map['value_array'] ||= [] ; map['value_array'].push(event['current_value']) ;"
, then create an ES doc with stored values in the elasticsearch ouput plugin when current_id != last_id (and there is still a problem with the last group of all) but I don't know if "ifs" must be inside or outside the aggregation plugin, nor how to write this in ruby.

Did someone encoutered something similar and can show what the structure would be?
Thanks for your thougths!

PS: Sorry for mistakes if there is any. English is not my native tongue.

No solution found yet. I explored the "mutiline" plugin option too, but it seems that the last or the first log of a group will be associated with another group...

Even a wild guess is welcome at this point.... :frowning:

I managed to find something, inspired by this usecase: Filter Plugin: Elasticsearch

It's using the ruby plugin and it should probrably be optimised. I changed my strategy since my first post.

filter {
    init => "
        @@map ={}
        @@map['list_of_values_for_this_group'] = []
        @@map['group_of_previous_event'] = 'start'
    code => "
        @@map['current_group'] = event['group_id']    
        if (@@map['group_of_previous_event'] != 'start' && @@map['group_of_previous_event'] != @@map['current_group'])  #if it's not the first event and if we just changed group, then reset the list
            @@map['list_of_values_for_this_group'] = []
        @@map['list_of_values_for_this_group'].push(event['value']) #put the new value in the list
        event['values']=@@map['list_of_values_for_this_group'] #we need to publish the list at each event to avoid the last one to be lost
        @@map['group_of_previous_event'] = event['group_id']

output {
elasticsearch {
  action =>"update"
  doc_as_upsert =>true
  index => "my_index"
  document_id => "%{group_id}" #each new event of a group updates the previous one

Hope it helps someone.