Logstash JDBC document_id not reading latest

Hello There,

I am referring this article https://www.elastic.co/blog/logstash-jdbc-input-plugin to ingest data from Oracle table. I need to get Oracle result set from some huge queries but before I do that I am playing with some small dataset to make sure the config is working.

What I want

  1. Whenever I update an existing row in DB, I also want to update my Elasticsearch document with those updates. Don't want to have a new document created with updated fields.
  2. Whenever I insert new row in DB, I want to get that new row in Elasticsearch.

What's not working

  1. I inserted 3 rows in DB to start with, so I should get 3 documents in Elasticsarch but I am getting only one.
  2. I inserted new row in DB but I cannot see newly inserted DB row in Elasticsearch.

What is working

  1. When I updated the row in DB (the similar row which I have in Elasticsearch), I can see the changes are being reflected.

I am using document_id => "%{uid}" based on above article. I do not have "uid" column in my Oracle table. The example in article does not have uid as a column in DB.

Below is my config which is running every 2 mins.

Could someone please help me to fix this?

Below is my Config

input {
    jdbc {
        type => "temp"
        jdbc_validate_connection => true
        jdbc_connection_string => “my_connection_string”
        jdbc_user => “name”
        jdbc_password => “pwd”
        jdbc_driver_library => "opt/jdbc/lib/ojdbc7.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        jdbc_default_timezone => 'America/Chicago'
        statement => "SELECT FIRSTNAME, LASTNAME, AGE, DOB, CITY, CREATED_DATE, UPDATED_DATE FROM TEST"
        schedule => "*/2 * * * *"
        }
}
output {
        elasticsearch {
                hosts => ["x.x.x.x:9200"]
                manage_template => true
                index => "<%{type}-{now/d}>"
                document_id => "%{uid}"
                }
}

Hello there, can someone please help and suggest?

@guyboertje thanks for updating the post to show config in readable format. How did you do that?

Also do you know if someone can help me here?

Before you send data to ES I suggest that you experiment with the stdout output instead.

output { stdout { codec => rubydebug } }

You should then see your inserts and updates every two minutes.

However, to true achieve your first ask, you will need

  1. a unique document_id from your DB.
  2. a fingerprint or hashid filter generated field, say fingerprint, that captures the current state of the record.
  3. an elasticsearch filter to query whether the document exists in the target ES index by document_id and add the existing fingerprint to a field existing_fingerprint to the current event.
  4. Filter logic to drop the event if the fingerprint and existing_fingerprint are equal (you got this record already)
  5. Filter logic to add a metadata field called action set to update when existing_fingerprint field is present and it is not equal to fingerprint - with an else clause that adds a metadata field called action set to insert
  6. Use action => "%{[@metadata][action]} in your elasticsearch output.

What you really need and we don't have (but are gathering info about) is Change Data Capture. Each DB tech does CDC differently - so its not something we can shoehorn into the JDBC input.

Formatting change is to enclose the code in Markdown triple backticks

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.