Hello all!
I am trying to use the Aggregate filter plugin to correlate and combine data from two different CSV file inputs which represent API data calls. The idea is to produce a record showing a combined picture. As you can expect the data may or may not arrive in the right sequence. Here's an example:
/data/incoming/source_1/*.csv
StartTime, AckTime, Operation, RefData1, RefData2, OpSpecificData1
231313232,44343545,Register,ref-data-1a,ref-data-2a,op-specific-data-1
979898999,75758383,Register,ref-data-1b,ref-data-2b,op-specific-data-2
354656466,98554321,Cancel,ref-data-1c,ref-data-2c,op-specific-data-2
/data/incoming/source_1/*.csv
FinishTime,Operation,RefData1, RefData2, FinishSpecificData
67657657575,Cancel,ref-data-1c,ref-data-2c,FinishSpecific-Data-1
68445590877,Register,ref-data-1a,ref-data-2a,FinishSpecific-Data-2
55443444313,Register,ref-data-1a,ref-data-2a,FinishSpecific-Data-2
I have a single pipeline that is receiving both these CSVs and I am able to process and write them as individual records to a single Index. However, the idea is to combine records from the two sources into one record each representing a superset. of Operation related information
Unfortunately, despite several attempts I have been unable to figure out how to achieve this via Aggregate filter plugin. My primary question is whether this is a suitable use of the specific plugin? And if so, any suggestions would be welcome!
At the moment, I have this
input {
file {
path => ['/data/incoming/source_1/*.csv']
tags => ["source1"]
}
file {
path => ['/data/incoming/source_2/*.csv']
tags => ["source2"]
}
# use the tags to do some source 1 and 2 related massaging, calculations, etc
aggregate {
task_id = "%{Operation}_%{RefData1}_%{RefData1}"
code => "
map['source_files'] ||= []
map['source_files'] << {'source_file', event.get('path') }
"
push_map_as_event_on_timeout => true
timeout => 600 #assuming this is the most far apart they will arrive
}
...
}
output {
elastic { ...}
}
And other such variations. However, I keep getting individual records being written to the Index and am unable to get one combined. Yet again, as you can see from the data set there's no guarantee of the sequencing of records - so I am wondering if the filter is the right tool for the job, to begin with?
Or is it just me not being able to use it right!
In either case, any inputs/ comments/ suggestions welcome. Thanks!
Edit: Also cross-posted to Stackoverflow.