Aggregation through Logstash

Dear Experts
I am using below aggregation to add child table entries to given parent ids -

aggregate {
task_id => "%{id}"
code => "
map['id'] = event.get('id')
map['sizes'] ||= []
map['sizes'] << {'width' => event.get('width'),
'height' => event.get('height'),
'weight' => event.get('weight')}

     event.cancel()
   "
   push_previous_map_as_event => true
   timeout => 3
 }

This is working fine with output block as under :

output {

elasticsearch {
 hosts => "http://localhost:9200"
 index => "products_details"
 document_type => "products_details"
 document_id => "%{id}"
 #action => "update"
 doc_as_upsert => true
 script_type => "inline"
}

Problem started when I added the scheduler part in the JDBC input block -

use_column_value => true
tracking_column => "size_id"
statement_filepath => "C:/Elastic/Schedule_last_value_size/sizes.sql"
schedule => "*/5 * * * *"

If there is any new entry in child table for a given parent id, all the previous records are getting replaced with this new entry.

Any pointers where I might be going wrong ? Any work around?

Alok

That tells elasticsearch to create, if necessary, or else overwrite the document with that id. Are you trying to append fields to an existing document in elasticsearch?

You are right.

Let me explain . lets say I have following document -

PO number : 101
Deliver date : Some Date
Total : Some amount
items :
i001 watch
i002 suitcase

Now I have another item- i003 - boots added into the same PO at my backend table and logstash fetches this new record. As you mentioned correctly , It will replace i001 and i002 and add i003 -boots

But I would like to have all three items together under same PO number.
How do we achieve this ?

Alok

I would have suggested an approach like this, but that does not work :slight_smile:

Instead of writing to elasticsearch using logstash, you could write the output to a file in a format suitable for using the bulk and update apis, then load the file into elasticsearch using curl.

So this gets uglier :frowning:
I have just started with Elastic. Thought logstash will help me schedule delta updates too.

Thanks for your quick reply.

As a work around, I have modified the select statement and it is now fetching all the related item data records for the given PO and doing aggregation on the whole data.
Previously I was fetching only the new child record.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.