Fingerprint & Aggregate Filter

How to generate a unique fingerprint for every record retrieved via JDBC plugin? For example, I am retrieving list of invoice line items via a JDBC plugin and using an aggregate filter to group those line items by the invoice number. Before I index the the document in elasticsearch, I would like to assign a unique fingerprint id as the document_id.

The issue is, if I use a fingerprint filter before the aggregate filter than the generated fingerprint value is not available either in event object or in @metadata object within the output section. If I use a fingerprint filter after the aggregate than the same fingerprint value is getting assigned to all document_id fields.

Any idea??

It may be helpful to share your pipeline configuration.

If the same record is retrieved twice, do you expect the same fingerprint, or a different one?

The aggregate filter is a little magical, in that it "swallows" many events, correlates them on the given task id, using your code to set and manipulate values in a map for that particular task, and then eventually emits different events once a "task" is deemed complete (that is, when it sees a "final" event or reaches a timeout).

As such, it requires that you copy over the bits that you want and put them in the map; while many individual events go into the aggregate filters, only one event will come out per ending event (or timeout).

Again, I would need to see how your fingerprint is defined; if the event emitted by the closing event in the aggregate filter does not have a particular key set, we would be effectively be doing a consistent hashing of a consistent (empty) value, resulting in a single consistent key.

I meant unique for every invoice.

Here is the conf file. I want to generate a fingerprint for each invoice document based on its PK and assign the same fingerprint value to the document_id

Assume 'invoicePK' is one of the returned column.

input {
  jdbc {
    jdbc_connection_string => "jdbc:sqlserver://........"
    # The user we wish to execute our statement as
    jdbc_user => "xxxxxx"
    jdbc_password => "xxxxxxxxx"
    # The path to our downloaded jdbc driver
    jdbc_driver_library => "xxxxxxxx.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    # our query
    statement => "xxxxxxxxxxxxx"
  }
}
filter {

  fingerprint {
    "method" => "SHA1"
    "key" => "dummyPhrase"
    "source" => ['%{invoicePK}']
  }

  aggregate {
    task_id => "%{invoicePK}"
    code =>
    "
      map['id'] = event.get('fingerprint')
      map['pk'] = event.get('invoicePK')
      map['number'] = event.get('number')
      map['date'] = event.get('date')
      map['dueDate'] = event.get('duedate')
      map['total'] = event.get('total')
      map['subTotal'] = event.get('subtotal')
      map['tax'] = event.get('tax')
      map['freight'] = event.get('freight')
      map['discount'] = event.get('discount')
      map['createdOn'] = event.get('createdon')
      map['modifiedOn'] = event.get('modifiedon')

      map['vendor'] ||= {}
      map['vendor']['id'] = event.get('vendorid');
      map['vendor']['name'] = event.get('vendor')

      map['lineItems'] ||= []
      map['lineItems'] << {
        'productId' => event.get('productid'),
        'vendorProductCode' => event.get('vendorproductcode'),
        'category' => event.get('category'),
        'description' => event.get('description'),
        'qty' => event.get('qty'),
        'unitCost' => event.get('unitcost'),
        'total' => event.get('extendedtotal')
      }

      event.cancel()
    "
    push_previous_map_as_event => true
    timeout => 3
  }

  mutate {
    add_tag => ["logstash 6.2.3"]
    remove_tag => ["_aggregatefinalflush"]
  }
}
output {
  stdout { codec => rubydebug { metadata => true } }

  elasticsearch {
    hosts => "https://localhost"
    index => "invoice"
  }
}

What about the provided configuration isn't working? What output do you expect to see, and how does what you get differ? When I run similar, I see a bunch of events with ids that are copied from the generated fingerprints.

A couple notes from my brief dive into the aggregate filter:

  • The push_previous_map_as_event setting requires that the events coming back are very explicitly ordered -- that is, that once you get a row that has a different value for invoicePK, that you'll never get any more rows with that value.
  • If you've got more than one pipeline worker, or running without explicit ordering constrained in Logstash, you're going to have a bad time; this plugin is very tightly bound to sequence and ordering.

With the above config, I get the same fingerprint for multiple invoices. When I look at the map attributes, 'fingerprint' key has the same value for all documents but 'pk' key has different values. I expect to see different fingerprint values for each invoices. I forgot to mention a line in an output section. I do have document_id = "%{fingerprint}" for the elasticsearch output. Only one document is getting indexed because of the same fingerprint value for every document.

All my events are in order and will make sure it remains that way. Thanks for clarifying a behavior of the aggregate filter.

When you run, does each document has a unique fingerprint? In my case, I get the same fingerprint for all documents as it seems that fingerprint routine only gets called once only.

AHA!

The source directive of the fingerprint plugin takes a literal field reference, not a sprintf format string:

   fingerprint {
     "method" => "SHA1"
     "key" => "dummyPhrase"
-    "source" => ['%{invoicePK}']
+    "source" => ['invoicePK']
   }

Thanks, finally I get a distinct fingerprint for each invoice. But now I see a different problem. Despite of having an order by INVPK clause in my query, aggregate is not aggregating documents by invoicePK.
I have two invoices with with three line items each. My query is returning list of line items ordered by invoicePK. I am expecting two invoice documents will be formed with three line items in an lineItem array. But instead I have four invoice documents. First invoice is with just one line item for PK1. Second invoice is with just one line item for PK2. Third invoice is with two line items for PK1 and fourth invoice document is with two line items for PK2.

Why do you think aggregate is not working despite of having events in order by invoicePK?

Are you starting Logstash with just one worker thread? The aggregate filter is order-dependent, and order cannot be guaranteed when multiple worker threads are splitting the work.

I had default 2 worker threads running. After changing it to 1 thread, it is now working as expected. Does that mean, an aggregate filter can't take the advantage of multi-threading?

Yes; the aggregate filter clearly calls out in the docs that it does not support parallel execution.

You should be very careful to set Logstash filter workers to 1 (-w 1 flag) for this filter to work correctly otherwise events may be processed out of sequence and unexpected results will occur.

-- Logstash Aggregate Filter docs

Thanks for the info and all the support

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.