Only one record is loaded to Elastic using Logstash conf file

Hi
Below is my logstash conf file , i am trying to load data from oracle table which has 320 records to Elasticsearch but only one record got loaded to ES.

I have enabled dead letter queue also but it created file with value 1 under file 1.
Please help me understand what went wrong.

Oracle Table structure - Customer table
Name Null? Type


CUSTOMER_ID NOT NULL NUMBER
NAME NOT NULL VARCHAR2(255)
ADDRESS VARCHAR2(255)
WEBSITE VARCHAR2(255)
CREDIT_LIMIT NUMBER(8,2)

Elastic Index Mapping

PUT /customerindex
{
"mappings": {

  "properties": {
    "@timestamp": {
      "type": "date"
    },
    "@version": {
      "type": "text",
      "fields": {
        "keyword": {
          "type": "keyword",
          "ignore_above": 256
        }
      }
    },
    "address": {
      "type": "text",
      "fields": {
        "keyword": {
          "type": "keyword",
          "ignore_above": 256
        }
      }
    },
    "credit_limit": {
      "type": "integer"
    },
    "customer_id": {
      "type": "integer"
    },
    "name": {
      "type": "text",
      "fields": {
        "keyword": {
          "type": "keyword",
          "ignore_above": 256
        }
      }
    },
    "website": {
      "type": "text",
      "fields": {
        "keyword": {
          "type": "keyword",
          "ignore_above": 256
        }
      }
    }
  }

}
}

My Logstash conf file

Sample Logstash configuration for creating a simple

Oracle -> Logstash -> Elasticsearch pipeline.

input
{
jdbc {

#Path to download jdbc deriver and add in class path
jdbc_driver_library => "C:\Users\Purnima\Downloads\ojdbc-full\OJDBC-Full\ojdbc6.jar"

# ORACLE Driver Class
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"

# ORACLE database jdbc connection string ,  jdbc:oracle:thin:@hostname:PORT/SERVICE
jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521/orcl"

#The user and password to connect to database
jdbc_user => "test"
jdbc_password => "test"


#Configure Cron to How frequent want execute query in database
#schedule => "*/5 * * * *"

#Use for Inline query and if want to execute record after last run compare with value sql_last_value that can be numeric or timestamp
statement => "select * from customers where customer_id>:sql_last_value order by customer_id asc"

#Below is configuration when want to use last run value

use_column_value => true
tracking_column => customer_id	
tracking_column_type => "numeric"
record_last_run => true
#This file keep record of sql_last_value so that when next time query run can utilize last run values
last_run_metadata_path =>"C:\Users\Purnima\Documents\logstash-7.8.0\lastrun\.logstash_jdbc_test_last_run"
 
}

}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "customerindex"
doc_as_upsert =>true
document_id=> customer_id
#user => "elastic"
#password => "changeme"
}
}

What do you see in the logstash log?

Thank You for your reply

here is the log

[2020-07-01T21:38:14,189][DEBUG][org.logstash.config.ir.CompiledPipeline][main] Compiled output
P[output-elasticsearch{"hosts"=>["http://localhost:9200"], "index"=>"customerindex", "doc_as_upsert"=>"true", "document_id"=>"customer_id"}|[file]C:/Users/Purnima/Documents/logstash-7.8.0/config/logstash-oracle-input.conf:40:3:```
elasticsearch {
hosts => ["http://localhost:9200"]
index => "customerindex"
doc_as_upsert =>true
document_id=> customer_id
#user => "elastic"
#password => "changeme"
}

 into 
 org.logstash.config.ir.compiler.ComputeStepSyntaxElement@92ba2de1
[2020-07-01T21:38:14,609][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2020-07-01T21:38:14,626][DEBUG][logstash.javapipeline    ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x13c07e3b run>"}
[2020-07-01T21:38:14,629][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-07-01T21:38:14,685][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2020-07-01T21:38:14,721][DEBUG][logstash.agent           ] Starting puma
[2020-07-01T21:38:14,740][DEBUG][logstash.agent           ] Trying to start WebServer {:port=>9600}
[2020-07-01T21:38:14,807][DEBUG][logstash.api.service     ] [api-service] start
[2020-07-01T21:38:15,144][DEBUG][logstash.inputs.jdbc     ][main][ae3da51c10671a37176f72c7c453442afbeb19eb9a76b5c90d6a0a04136f64fc] loading C:\Users\Purnima\Downloads\ojdbc-full\OJDBC-Full\ojdbc6.jar
[2020-07-01T21:38:15,164][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2020-07-01T21:38:16,259][INFO ][logstash.inputs.jdbc     ][main][ae3da51c10671a37176f72c7c453442afbeb19eb9a76b5c90d6a0a04136f64fc] (0.029207s) select version from PRODUCT_COMPONENT_VERSION where lower(product) like 'oracle%'
[2020-07-01T21:38:16,448][INFO ][logstash.inputs.jdbc     ][main][ae3da51c10671a37176f72c7c453442afbeb19eb9a76b5c90d6a0a04136f64fc] (0.004169s) SELECT count(*) "COUNT" FROM (select * from customers where customer_id>0 order by customer_id asc) "T1" FETCH NEXT 1 ROWS ONLY
[2020-07-01T21:38:16,465][INFO ][logstash.inputs.jdbc     ][main][ae3da51c10671a37176f72c7c453442afbeb19eb9a76b5c90d6a0a04136f64fc] (0.000884s) SELECT count(*) "COUNT" FROM (select * from customers where customer_id>0 order by customer_id asc) "T1" FETCH NEXT 1 ROWS ONLY
[2020-07-01T21:38:16,470][DEBUG][logstash.inputs.jdbc     ][main][ae3da51c10671a37176f72c7c453442afbeb19eb9a76b5c90d6a0a04136f64fc] Executing JDBC query {:statement=>"select * from customers where customer_id>:sql_last_value order by customer_id asc", :parameters=>{:sql_last_value=>0}, :count=>320}
[2020-07-01T21:38:16,479][INFO ][logstash.inputs.jdbc     ][main][ae3da51c10671a37176f72c7c453442afbeb19eb9a76b5c90d6a0a04136f64fc] (0.001382s) select * from customers where customer_id>0 order by customer_id asc

Oracle version is 12.2.0.1.0

The jdbc input thinks it is fetching 320 rows. Is it possible they all have the same customer_id, so the rows are all overwritten in elasticsearch?

You are right, because of same customer id remaining records got deleted, I have fixed it by changing document_id=> "%{customer_id}"

Previously I have given just the customer_id which will be same in all cases

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.