Splitting multiple arrays in Logstash to create multi events

Hi all,
I have some array fields with different types as following:
f1=[0,1,2,3,4]
f2=[a1,a2,a3,a4,a5]
f3=[06/24/2019 06:02:00.000,06/24/2019 07:02:00.000,06/24/2019 08:02:00.000,06/24/2019 09:02:00.000,06/24/2019 05:02:00.000]
f4=[8675765,9766686,79879877,98798987,987879879]

I want to split these arrays so that for each element a new event be created as following:

 event 1:
f1=0
f2=a1
f3=06/24/2019 06:02:00.000
f4=8675765
event2: 
f1=1
f2=a2
f3=06/24/2019 07:02:00.000
f4=976668
event3: 
f1=2
f2=a3
f3=6/24/2019 08:02:00.000
f4=79879877

and so on,

how can i do that? could you please advise me?

Assuming they always have the same length:

  1. Loop through them to create an array that contains one entry with every first value, one entry with every second value, etc. …
  2. Make sure the result is an array, not a hash. (I had a problem here. Maybe my solution is not the most elegant code :wink:)
  3. Remove the old data.
  4. Use the split filter to get one event for each of the created array entries.
  5. Bring the fields from the split field to the root level of the event.

I used one of my pipelines to improvise the example code, but didn't test this, so there might be mistakes. But the basic idea should work.

ruby {
  code => "
    //step 1
    event.get('f1').each_with_index do |value, key|
      event.set('[merged_data]['+key.to_s+'][f1]', value);
      event.set('[merged_data]['+key.to_s+'][f2]', event.get('[f2]['+key.to_s+']'));
      event.set('[merged_data]['+key.to_s+'][f3]', event.get('[f3]['+key.to_s+']'));
      event.set('[merged_data]['+key.to_s+'][f4]', event.get('[f4]['+key.to_s+']'));
    end
    //step 2
    event.set('merged_data', event.get('merged_data').values)
    //step 3
    event.remove('f1');
    event.remove('f2');
    event.remove('f3');
    event.remove('f4');
  "
 }
//step 4
 split {
  field => "merged_data"
}
//step 5
ruby {
  code => "
    event.get('merged_data').each { | k, v| event.set(k, v) }
    event.remove('merged_data')
  "
}

Alternatively

input { generator { count => 1 lines => [ '' ] } }

filter {
    mutate { add_field => {
        "f1" => [0,1,2,3,4]
        "f2" => ["a1","a2","a3","a4","a5"]
        "f3" => ["06/24/2019 06:02:00.000","06/24/2019 07:02:00.000","06/24/2019 08:02:00.000","06/24/2019 09:02:00.000","06/24/2019 05:02:00.000"]
        "f4" => [8675765,9766686,79879877,98798987,987879879]
    } }
    ruby {
        code => '
            f1 = event.get("f1")
            f2 = event.get("f2")
            f3 = event.get("f3")
            f4 = event.get("f4")
            a = []
            f1.each_index { |i|
                h = {}
                h["f1"] = f1[i]
                h["f2"] = f2[i]
                h["f3"] = f3[i]
                h["f4"] = f4[i]
                a << h
            }
            event.set("[@metadata][a]", a)
        '
        remove_field => [ "f1", "f2", "f3", "f4" ]
    }
    split { field => "[@metadata][a]" }
    ruby { code => 'event.get("[@metadata][a]").each { |k, v| event.set(k,v) }' }
}

many thanks. it works

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.