Hi!
I'm working on a PoC where we are evaluating how to ingest data from an Oracle table into ElasticSearch using Logstash and multiple connections to the Oracle DB.
While doing this I observed a strange bahviour I can't explain. So maybe one of you can give me a hint what my misunderstanding is.
The observation I made is, when I make a full load of a table in the Oracle DB into an empty ElasticSearch index, Logstash seems to write certain records multiple times.
To be clearer, the Oracle table contains 7656494 records. When the load is done, the ElasticSearch index contains 7656494 documents, but also 34663 deleted documents (this number varies when I rerun the whole process). The only way I can explain the deleted documents is, that Logstash loads several records more than once into the index?!?
My setup is all realized with Docker containers on one single machine. I use a self build Logstash image, but it only contains two additional plugins. Here is the Dockerfile:
FROM docker.elastic.co/logstash/logstash-oss:6.4.1
RUN bin/logstash-plugin install logstash-filter-prune
RUN bin/logstash-plugin install logstash-output-jdbc
This is the DDL for the Oracle table:
CREATE TABLE POC.UMSATZKOMPAKT (
ID integer GENERATED BY DEFAULT AS IDENTITY (START WITH 1) NOT NULL PRIMARY KEY,
FIELD1 VARCHAR2(1024 CHAR),
FIELD2 VARCHAR2(1024 CHAR),
FIELD3 VARCHAR2(1024 CHAR),
INGEST_TS TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL
);
CREATE INDEX POC.IDX_INGEST_TS on POC.UMSATZKOMPAKT(INGEST_TS);
This is the pipeline config:
# The # character at the beginning of a line indicates a comment. Use
# comments to describe your configuration.
input {
jdbc {
statement => "SELECT ID, FIELD1, FIELD2, FIELD3 FROM POC.UMSATZKOMPAKT WHERE ID > :sql_last_value AND mod(to_number(to_char(ingest_ts, 'ss')),4) = 0"
use_column_value => true
tracking_column => "ID"
jdbc_driver_library => "/usr/share/logstash/config/ojdbc6.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521:xe"
jdbc_user => "poc"
jdbc_password => "poc"
schedule => "*/1 * * * * *"
last_run_metadata_path => "/home/ph/.logstash_jdbc_last_run_1"
lowercase_column_names => false
tags => ["oracle"]
}
jdbc {
statement => "SELECT ID, FIELD1, FIELD2, FIELD3 FROM POC.UMSATZKOMPAKT WHERE ID > :sql_last_value AND mod(to_number(to_char(ingest_ts, 'ss')),4) = 1"
use_column_value => true
tracking_column => "ID"
jdbc_driver_library => "/usr/share/logstash/config/ojdbc6.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521:xe"
jdbc_user => "poc"
jdbc_password => "poc"
schedule => "*/1 * * * * *"
last_run_metadata_path => "/home/ph/.logstash_jdbc_last_run_2"
lowercase_column_names => false
tags => ["oracle"]
}
jdbc {
statement => "SELECT ID, FIELD1, FIELD2, FIELD3 FROM POC.UMSATZKOMPAKT WHERE ID > :sql_last_value AND mod(to_number(to_char(ingest_ts, 'ss')),4) = 2"
use_column_value => true
tracking_column => "ID"
jdbc_driver_library => "/usr/share/logstash/config/ojdbc6.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521:xe"
jdbc_user => "poc"
jdbc_password => "poc"
schedule => "*/1 * * * * *"
last_run_metadata_path => "/home/ph/.logstash_jdbc_last_run_3"
lowercase_column_names => false
tags => ["oracle"]
}
jdbc {
statement => "SELECT ID, FIELD1, FIELD2, FIELD3 FROM POC.UMSATZKOMPAKT WHERE ID > :sql_last_value AND mod(to_number(to_char(ingest_ts, 'ss')),4) = 3"
use_column_value => true
tracking_column => "ID"
jdbc_driver_library => "/usr/share/logstash/config/ojdbc6.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521:xe"
jdbc_user => "poc"
jdbc_password => "poc"
schedule => "*/1 * * * * *"
last_run_metadata_path => "/home/ph/.logstash_jdbc_last_run_4"
lowercase_column_names => false
tags => ["oracle"]
}
}
# The filter part of this file is commented out to indicate that it is
# optional.
#filter {
# ruby {
# code => "event.set('logstash_processed_at', Time.now());"
# }
# metrics {
# meter => "events"
# add_tag => "metric"
# }
#}
output {
# stdout { codec => rubydebug }
# if "metric" in [tags] {
# stdout { codec => rubydebug }
# }
if "oracle" in [tags] {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "test2"
document_id => "%{ID}"
}
}
}
And this is how I start the logstash conatiner:
docker run --rm -it \
-e "PIPELINE_WORKERS=4" \
-e "LS_JAVA_OPTS=-Xms10g -Xmx10g" \
-e "LOG_LEVEL=info" \
--name="logstash_oracle" \
--network="host" \
-v /home/data/pipeline/logstash/pipeline_oracle_multithread/:/usr/share/logstash/pipeline/ \
-v /home/data/pipeline/logstash/jdbc_oracle/ojdbc6.jar:/usr/share/logstash/config/ojdbc6.jar \
-v /home/data/pipeline/logstash/jdbc_oracle/.logstash_jdbc_last_run_1:/home/ph/.logstash_jdbc_last_run_1 \
-v /home/data/pipeline/logstash/jdbc_oracle/.logstash_jdbc_last_run_2:/home/ph/.logstash_jdbc_last_run_2 \
-v /home/data/pipeline/logstash/jdbc_oracle/.logstash_jdbc_last_run_3:/home/ph/.logstash_jdbc_last_run_3 \
-v /home/data/pipeline/logstash/jdbc_oracle/.logstash_jdbc_last_run_4:/home/ph/.logstash_jdbc_last_run_4 \
poc_pfm/logstash:1
So again my question: what am I doing wrong to produce this duplicate records?
Thank you for helping
Jan