Aggregation through Logstash


(Alok) #1

Dear Experts
I am using below aggregation to add child table entries to given parent ids -

aggregate {
task_id => "%{id}"
code => "
map['id'] = event.get('id')
map['sizes'] ||= []
map['sizes'] << {'width' => event.get('width'),
'height' => event.get('height'),
'weight' => event.get('weight')}

     event.cancel()
   "
   push_previous_map_as_event => true
   timeout => 3
 }

This is working fine with output block as under :

output {

elasticsearch {
 hosts => "http://localhost:9200"
 index => "products_details"
 document_type => "products_details"
 document_id => "%{id}"
 #action => "update"
 doc_as_upsert => true
 script_type => "inline"
}

Problem started when I added the scheduler part in the JDBC input block -

use_column_value => true
tracking_column => "size_id"
statement_filepath => "C:/Elastic/Schedule_last_value_size/sizes.sql"
schedule => "*/5 * * * *"

If there is any new entry in child table for a given parent id, all the previous records are getting replaced with this new entry.

Any pointers where I might be going wrong ? Any work around?

Alok


#2

That tells elasticsearch to create, if necessary, or else overwrite the document with that id. Are you trying to append fields to an existing document in elasticsearch?


(Alok) #3

You are right.

Let me explain . lets say I have following document -

PO number : 101
Deliver date : Some Date
Total : Some amount
items :
i001 watch
i002 suitcase

Now I have another item- i003 - boots added into the same PO at my backend table and logstash fetches this new record. As you mentioned correctly , It will replace i001 and i002 and add i003 -boots

But I would like to have all three items together under same PO number.
How do we achieve this ?

Alok


#4

I would have suggested an approach like this, but that does not work :slight_smile:

Instead of writing to elasticsearch using logstash, you could write the output to a file in a format suitable for using the bulk and update apis, then load the file into elasticsearch using curl.


(Alok) #5

So this gets uglier :frowning:
I have just started with Elastic. Thought logstash will help me schedule delta updates too.

Thanks for your quick reply.


(Alok) #6

As a work around, I have modified the select statement and it is now fetching all the related item data records for the given PO and doing aggregation on the whole data.
Previously I was fetching only the new child record.


(system) #7

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.