How to generate a unique fingerprint for every record retrieved via JDBC plugin? For example, I am retrieving list of invoice line items via a JDBC plugin and using an aggregate filter to group those line items by the invoice number. Before I index the the document in elasticsearch, I would like to assign a unique fingerprint id as the document_id.
The issue is, if I use a fingerprint filter before the aggregate filter than the generated fingerprint value is not available either in event object or in @metadata object within the output section. If I use a fingerprint filter after the aggregate than the same fingerprint value is getting assigned to all document_id fields.
It may be helpful to share your pipeline configuration.
If the same record is retrieved twice, do you expect the same fingerprint, or a different one?
The aggregate filter is a little magical, in that it "swallows" many events, correlates them on the given task id, using your code to set and manipulate values in a map for that particular task, and then eventually emits different events once a "task" is deemed complete (that is, when it sees a "final" event or reaches a timeout).
As such, it requires that you copy over the bits that you want and put them in the map; while many individual events go into the aggregate filters, only one event will come out per ending event (or timeout).
Again, I would need to see how your fingerprint is defined; if the event emitted by the closing event in the aggregate filter does not have a particular key set, we would be effectively be doing a consistent hashing of a consistent (empty) value, resulting in a single consistent key.
Here is the conf file. I want to generate a fingerprint for each invoice document based on its PK and assign the same fingerprint value to the document_id
What about the provided configuration isn't working? What output do you expect to see, and how does what you get differ? When I run similar, I see a bunch of events with ids that are copied from the generated fingerprints.
A couple notes from my brief dive into the aggregate filter:
The push_previous_map_as_event setting requires that the events coming back are very explicitly ordered -- that is, that once you get a row that has a different value for invoicePK, that you'll never get any more rows with that value.
If you've got more than one pipeline worker, or running without explicit ordering constrained in Logstash, you're going to have a bad time; this plugin is very tightly bound to sequence and ordering.
With the above config, I get the same fingerprint for multiple invoices. When I look at the map attributes, 'fingerprint' key has the same value for all documents but 'pk' key has different values. I expect to see different fingerprint values for each invoices. I forgot to mention a line in an output section. I do have document_id = "%{fingerprint}" for the elasticsearch output. Only one document is getting indexed because of the same fingerprint value for every document.
When you run, does each document has a unique fingerprint? In my case, I get the same fingerprint for all documents as it seems that fingerprint routine only gets called once only.
Thanks, finally I get a distinct fingerprint for each invoice. But now I see a different problem. Despite of having an order by INVPK clause in my query, aggregate is not aggregating documents by invoicePK.
I have two invoices with with three line items each. My query is returning list of line items ordered by invoicePK. I am expecting two invoice documents will be formed with three line items in an lineItem array. But instead I have four invoice documents. First invoice is with just one line item for PK1. Second invoice is with just one line item for PK2. Third invoice is with two line items for PK1 and fourth invoice document is with two line items for PK2.
Why do you think aggregate is not working despite of having events in order by invoicePK?
Are you starting Logstash with just one worker thread? The aggregate filter is order-dependent, and order cannot be guaranteed when multiple worker threads are splitting the work.
I had default 2 worker threads running. After changing it to 1 thread, it is now working as expected. Does that mean, an aggregate filter can't take the advantage of multi-threading?
Yes; the aggregate filter clearly calls out in the docs that it does not support parallel execution.
You should be very careful to set Logstash filter workers to 1 (-w 1 flag) for this filter to work correctly otherwise events may be processed out of sequence and unexpected results will occur.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.