Hi
Below is my logstash conf file , i am trying to load data from oracle table which has 320 records to Elasticsearch but only one record got loaded to ES.
I have enabled dead letter queue also but it created file with value 1 under file 1.
Please help me understand what went wrong.
Oracle Table structure - Customer table
Name Null? Type
CUSTOMER_ID NOT NULL NUMBER
NAME NOT NULL VARCHAR2(255)
ADDRESS VARCHAR2(255)
WEBSITE VARCHAR2(255)
CREDIT_LIMIT NUMBER(8,2)
Elastic Index Mapping
PUT /customerindex
{
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"address": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"credit_limit": {
"type": "integer"
},
"customer_id": {
"type": "integer"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"website": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
My Logstash conf file
Sample Logstash configuration for creating a simple
Oracle -> Logstash -> Elasticsearch pipeline.
input
{
jdbc {
#Path to download jdbc deriver and add in class path
jdbc_driver_library => "C:\Users\Purnima\Downloads\ojdbc-full\OJDBC-Full\ojdbc6.jar"
# ORACLE Driver Class
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
# ORACLE database jdbc connection string , jdbc:oracle:thin:@hostname:PORT/SERVICE
jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521/orcl"
#The user and password to connect to database
jdbc_user => "test"
jdbc_password => "test"
#Configure Cron to How frequent want execute query in database
#schedule => "*/5 * * * *"
#Use for Inline query and if want to execute record after last run compare with value sql_last_value that can be numeric or timestamp
statement => "select * from customers where customer_id>:sql_last_value order by customer_id asc"
#Below is configuration when want to use last run value
use_column_value => true
tracking_column => customer_id
tracking_column_type => "numeric"
record_last_run => true
#This file keep record of sql_last_value so that when next time query run can utilize last run values
last_run_metadata_path =>"C:\Users\Purnima\Documents\logstash-7.8.0\lastrun\.logstash_jdbc_test_last_run"
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "customerindex"
doc_as_upsert =>true
document_id=> customer_id
#user => "elastic"
#password => "changeme"
}
}
What do you see in the logstash log?
Thank You for your reply
here is the log
[2020-07-01T21:38:14,189][DEBUG][org.logstash.config.ir.CompiledPipeline][main] Compiled output
P[output-elasticsearch{"hosts"=>["http://localhost:9200"], "index"=>"customerindex", "doc_as_upsert"=>"true", "document_id"=>"customer_id"}|[file]C:/Users/Purnima/Documents/logstash-7.8.0/config/logstash-oracle-input.conf:40:3:```
elasticsearch {
hosts => ["http://localhost:9200"]
index => "customerindex"
doc_as_upsert =>true
document_id=> customer_id
#user => "elastic"
#password => "changeme"
}
into
org.logstash.config.ir.compiler.ComputeStepSyntaxElement@92ba2de1
[2020-07-01T21:38:14,609][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
[2020-07-01T21:38:14,626][DEBUG][logstash.javapipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x13c07e3b run>"}
[2020-07-01T21:38:14,629][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-07-01T21:38:14,685][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2020-07-01T21:38:14,721][DEBUG][logstash.agent ] Starting puma
[2020-07-01T21:38:14,740][DEBUG][logstash.agent ] Trying to start WebServer {:port=>9600}
[2020-07-01T21:38:14,807][DEBUG][logstash.api.service ] [api-service] start
[2020-07-01T21:38:15,144][DEBUG][logstash.inputs.jdbc ][main][ae3da51c10671a37176f72c7c453442afbeb19eb9a76b5c90d6a0a04136f64fc] loading C:\Users\Purnima\Downloads\ojdbc-full\OJDBC-Full\ojdbc6.jar
[2020-07-01T21:38:15,164][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2020-07-01T21:38:16,259][INFO ][logstash.inputs.jdbc ][main][ae3da51c10671a37176f72c7c453442afbeb19eb9a76b5c90d6a0a04136f64fc] (0.029207s) select version from PRODUCT_COMPONENT_VERSION where lower(product) like 'oracle%'
[2020-07-01T21:38:16,448][INFO ][logstash.inputs.jdbc ][main][ae3da51c10671a37176f72c7c453442afbeb19eb9a76b5c90d6a0a04136f64fc] (0.004169s) SELECT count(*) "COUNT" FROM (select * from customers where customer_id>0 order by customer_id asc) "T1" FETCH NEXT 1 ROWS ONLY
[2020-07-01T21:38:16,465][INFO ][logstash.inputs.jdbc ][main][ae3da51c10671a37176f72c7c453442afbeb19eb9a76b5c90d6a0a04136f64fc] (0.000884s) SELECT count(*) "COUNT" FROM (select * from customers where customer_id>0 order by customer_id asc) "T1" FETCH NEXT 1 ROWS ONLY
[2020-07-01T21:38:16,470][DEBUG][logstash.inputs.jdbc ][main][ae3da51c10671a37176f72c7c453442afbeb19eb9a76b5c90d6a0a04136f64fc] Executing JDBC query {:statement=>"select * from customers where customer_id>:sql_last_value order by customer_id asc", :parameters=>{:sql_last_value=>0}, :count=>320}
[2020-07-01T21:38:16,479][INFO ][logstash.inputs.jdbc ][main][ae3da51c10671a37176f72c7c453442afbeb19eb9a76b5c90d6a0a04136f64fc] (0.001382s) select * from customers where customer_id>0 order by customer_id asc
Oracle version is 12.2.0.1.0
The jdbc input thinks it is fetching 320 rows. Is it possible they all have the same customer_id, so the rows are all overwritten in elasticsearch?
You are right, because of same customer id remaining records got deleted, I have fixed it by changing document_id=> "%{customer_id}"
Previously I have given just the customer_id which will be same in all cases