Aggregate filter for mixed data lines

Hello all!

I am trying to use the Aggregate filter plugin to correlate and combine data from two different CSV file inputs which represent API data calls. The idea is to produce a record showing a combined picture. As you can expect the data may or may not arrive in the right sequence. Here's an example:

/data/incoming/source_1/*.csv

StartTime, AckTime, Operation, RefData1, RefData2, OpSpecificData1
231313232,44343545,Register,ref-data-1a,ref-data-2a,op-specific-data-1
979898999,75758383,Register,ref-data-1b,ref-data-2b,op-specific-data-2
354656466,98554321,Cancel,ref-data-1c,ref-data-2c,op-specific-data-2

/data/incoming/source_1/*.csv

FinishTime,Operation,RefData1, RefData2, FinishSpecificData
67657657575,Cancel,ref-data-1c,ref-data-2c,FinishSpecific-Data-1
68445590877,Register,ref-data-1a,ref-data-2a,FinishSpecific-Data-2
55443444313,Register,ref-data-1a,ref-data-2a,FinishSpecific-Data-2

I have a single pipeline that is receiving both these CSVs and I am able to process and write them as individual records to a single Index. However, the idea is to combine records from the two sources into one record each representing a superset. of Operation related information

Unfortunately, despite several attempts I have been unable to figure out how to achieve this via Aggregate filter plugin. My primary question is whether this is a suitable use of the specific plugin? And if so, any suggestions would be welcome!

At the moment, I have this

input {
   file {
      path => ['/data/incoming/source_1/*.csv']
      tags => ["source1"]
   }
   file {
      path => ['/data/incoming/source_2/*.csv']
      tags => ["source2"]
   }
   # use the tags to do some source 1 and 2 related massaging, calculations, etc

   aggregate {
         task_id = "%{Operation}_%{RefData1}_%{RefData1}"
         code => "
             map['source_files'] ||= []
             map['source_files'] << {'source_file', event.get('path') }
         "
         push_map_as_event_on_timeout => true
         timeout => 600 #assuming this is the most far apart they will arrive         
   }
  ...
}
output {
    elastic { ...}
}

And other such variations. However, I keep getting individual records being written to the Index and am unable to get one combined. Yet again, as you can see from the data set there's no guarantee of the sequencing of records - so I am wondering if the filter is the right tool for the job, to begin with? :roll_eyes:

Or is it just me not being able to use it right! :face_with_hand_over_mouth:

In either case, any inputs/ comments/ suggestions welcome. Thanks!

Edit: Also cross-posted to Stackoverflow.

The two file input plugins in your Logstash pipelines run in their own threads and do not share events with each other as explained in the execution model [here.] (Execution Model | Logstash Reference [8.11] | Elastic)

To overcome this problem, you could do something like below:

  1. In pipeline (lets say A), first index the CSVs say /data/incoming/source_1/*.csv in Elasticsearch indexA

  2. In another pipeline (lets say B), use the file input plugin to read the source/data/incoming/source_2/*.csv and use the Elasticsearch filter plugin to query the indexA to look up the corresponding values and enrich your events and then let this get indexed in indexB. indexB is what you are looking for.

That sounds like an idea! Let me give it a shot and come back with results o further question. :grinning: [quote="Rahul_Kumar4, post:2, topic:236110"]
use the Elasticsearch filter plugin to query the indexA to look up the corresponding values and enrich your events and then let this get indexed in indexB . indexB is what you are looking for.
[/quote]

In the meanwhile, I spotted this post which allows me to use the Elasticsearch output plugin in upsert mode - which pretty near does what i need. Here's what my output section now looks like

...
output {

  if "source1" in [tags] {
       elasticsearch { ..} # write to source1 specific index
  }
  else if "source2" in [tags] {
       elasticsearch { ..} # write to source2 specific index
  }
  # and ultimately the combined index
   elasticsearch{
      hosts => ["my-es-host:9200"]
      index => ["my-combined-index"]
      action => "update"
      document_id => "%{Operation}_%{RefData1}_%{RefData1}"
      doc_as_upsert  => true
   }
}

This gives me two choices - wicked!

Yeap, this should work too. You won't have to use the Elasticsearch in the filter section in this case and you won't have the redundant indexB in this case as your indexA will itself get updated.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.