JDBC to ElasticSearch: duplicate records?


(Jan) #1

Hi!

I'm working on a PoC where we are evaluating how to ingest data from an Oracle table into ElasticSearch using Logstash and multiple connections to the Oracle DB.

While doing this I observed a strange bahviour I can't explain. So maybe one of you can give me a hint what my misunderstanding is.

The observation I made is, when I make a full load of a table in the Oracle DB into an empty ElasticSearch index, Logstash seems to write certain records multiple times.

To be clearer, the Oracle table contains 7656494 records. When the load is done, the ElasticSearch index contains 7656494 documents, but also 34663 deleted documents (this number varies when I rerun the whole process). The only way I can explain the deleted documents is, that Logstash loads several records more than once into the index?!?

My setup is all realized with Docker containers on one single machine. I use a self build Logstash image, but it only contains two additional plugins. Here is the Dockerfile:

FROM docker.elastic.co/logstash/logstash-oss:6.4.1
RUN bin/logstash-plugin install logstash-filter-prune
RUN bin/logstash-plugin install logstash-output-jdbc

This is the DDL for the Oracle table:

CREATE TABLE POC.UMSATZKOMPAKT (
 ID integer GENERATED BY DEFAULT AS IDENTITY (START WITH 1) NOT NULL PRIMARY KEY,
 FIELD1 VARCHAR2(1024 CHAR),
 FIELD2 VARCHAR2(1024 CHAR),
 FIELD3 VARCHAR2(1024 CHAR),
 INGEST_TS TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL
);

CREATE INDEX POC.IDX_INGEST_TS on POC.UMSATZKOMPAKT(INGEST_TS);

This is the pipeline config:

# The # character at the beginning of a line indicates a comment. Use
# comments to describe your configuration.
input {
    jdbc {
        statement => "SELECT ID, FIELD1, FIELD2, FIELD3 FROM POC.UMSATZKOMPAKT WHERE ID > :sql_last_value AND mod(to_number(to_char(ingest_ts, 'ss')),4) = 0"
        use_column_value => true
        tracking_column => "ID"
        jdbc_driver_library => "/usr/share/logstash/config/ojdbc6.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521:xe"
        jdbc_user => "poc"
        jdbc_password => "poc"
        schedule => "*/1 * * * * *"
        last_run_metadata_path => "/home/ph/.logstash_jdbc_last_run_1"
        lowercase_column_names => false
        tags => ["oracle"]
    }
    jdbc {
        statement => "SELECT ID, FIELD1, FIELD2, FIELD3 FROM POC.UMSATZKOMPAKT WHERE ID > :sql_last_value AND mod(to_number(to_char(ingest_ts, 'ss')),4) = 1"
        use_column_value => true
        tracking_column => "ID"
        jdbc_driver_library => "/usr/share/logstash/config/ojdbc6.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521:xe"
        jdbc_user => "poc"
        jdbc_password => "poc"
        schedule => "*/1 * * * * *"
        last_run_metadata_path => "/home/ph/.logstash_jdbc_last_run_2"
        lowercase_column_names => false
        tags => ["oracle"]
    }
    jdbc {
        statement => "SELECT ID, FIELD1, FIELD2, FIELD3 FROM POC.UMSATZKOMPAKT WHERE ID > :sql_last_value AND mod(to_number(to_char(ingest_ts, 'ss')),4) = 2"
        use_column_value => true
        tracking_column => "ID"
        jdbc_driver_library => "/usr/share/logstash/config/ojdbc6.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521:xe"
        jdbc_user => "poc"
        jdbc_password => "poc"
        schedule => "*/1 * * * * *"
        last_run_metadata_path => "/home/ph/.logstash_jdbc_last_run_3"
        lowercase_column_names => false
        tags => ["oracle"]
    }
    jdbc {
        statement => "SELECT ID, FIELD1, FIELD2, FIELD3 FROM POC.UMSATZKOMPAKT WHERE ID > :sql_last_value AND mod(to_number(to_char(ingest_ts, 'ss')),4) = 3"
        use_column_value => true
        tracking_column => "ID"
        jdbc_driver_library => "/usr/share/logstash/config/ojdbc6.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521:xe"
        jdbc_user => "poc"
        jdbc_password => "poc"
        schedule => "*/1 * * * * *"
        last_run_metadata_path => "/home/ph/.logstash_jdbc_last_run_4"
        lowercase_column_names => false
        tags => ["oracle"]
    }

}
# The filter part of this file is commented out to indicate that it is
# optional.
#filter {
#    ruby { 
#        code => "event.set('logstash_processed_at', Time.now());"
#    } 
#    metrics {
#        meter => "events"
#        add_tag => "metric"
#    }
#}
output {
#   stdout { codec => rubydebug }  
#    if "metric" in [tags] {
#       stdout { codec => rubydebug }
#    }
    if "oracle" in [tags] {
        elasticsearch {
            hosts => [ "localhost:9200" ] 
            index => "test2"
            document_id => "%{ID}"
        }
    }  
}

And this is how I start the logstash conatiner:

docker run --rm -it \
-e "PIPELINE_WORKERS=4" \
-e "LS_JAVA_OPTS=-Xms10g -Xmx10g" \
-e "LOG_LEVEL=info" \
--name="logstash_oracle" \
--network="host" \
-v /home/data/pipeline/logstash/pipeline_oracle_multithread/:/usr/share/logstash/pipeline/ \
-v /home/data/pipeline/logstash/jdbc_oracle/ojdbc6.jar:/usr/share/logstash/config/ojdbc6.jar \
-v /home/data/pipeline/logstash/jdbc_oracle/.logstash_jdbc_last_run_1:/home/ph/.logstash_jdbc_last_run_1 \
-v /home/data/pipeline/logstash/jdbc_oracle/.logstash_jdbc_last_run_2:/home/ph/.logstash_jdbc_last_run_2 \
-v /home/data/pipeline/logstash/jdbc_oracle/.logstash_jdbc_last_run_3:/home/ph/.logstash_jdbc_last_run_3 \
-v /home/data/pipeline/logstash/jdbc_oracle/.logstash_jdbc_last_run_4:/home/ph/.logstash_jdbc_last_run_4 \
poc_pfm/logstash:1

So again my question: what am I doing wrong to produce this duplicate records?

Thank you for helping :slight_smile:

Jan


(Jan) #2

For those who are interested in the reason/solution:
Logstash seems to use the last received record to determine the value of the tracking column. As Oracle (and I guess other SQL databases as well) does not guarantee the sort order of the query, the last record received is not necessarily the one with the latest ID. Thus records could be selected multiple times.

We solved this by adding "ORDER BY ID" to the SQL statement to guarantee the last received record contains the latest value in the tracking column.


(system) #3

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.