Hi,
I am using logstash JDBC plugin to ship data from Mongo DB to elasticsearch. The data stored in the Mongo DB is in JSON documents. When logstash try to fetch the data every 1 min its getting repeated data. Total documents is DB is 20378 but my logstash is shipping all 20378 documents on every minute it is scheduled. Need immediate help. Below is the logstash configuration file.
</>
input {
jdbc {
jdbc_driver_library =>"C:/Users/home/Downloads/Backup/Downloads/mongo-java-driver-3.4.2.jar,C:/Users/home/Downloads/Backup/Downloads/mongojdbc1.2.jar"
jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
jdbc_connection_string => "jdbc:mongodb://***:***/*****"
jdbc_user => "admin"
schedule => "/1 * * * *"
statement => "db.DeviceServiceData.find()"
last_run_metadata_path => "C:/Users/home/.logstash_jdbc_last_run"
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "myindex"
}
stdout { codec => rubydebug }
}
</>
By running the above logstash configuration file am able to receive the data but it is repeated. every minute logstash goes and fetches the complete data in DB. I need only the updated. Below are the logs of logstash.
</>
[2019-07-11T06:07:01,719][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.1.1"}
[2019-07-11T06:07:10,202][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>, :added=>[http://localhost:9200/]}}
[2019-07-11T06:07:10,437][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2019-07-11T06:07:10,496][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>7}
[2019-07-11T06:07:10,500][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the type
event field won't be used to determine the document _type {:es_version=>7}
[2019-07-11T06:07:10,527][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2019-07-11T06:07:10,541][INFO ][logstash.outputs.elasticsearch] Using default mapping template
[2019-07-11T06:07:10,563][INFO ][logstash.javapipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500, :thread=>"#<Thread:0x2dcff742 run>"}
[2019-07-11T06:07:10,748][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2019-07-11T06:07:10,879][INFO ][logstash.javapipeline ] Pipeline started {"pipeline.id"=>"main"}
[2019-07-11T06:07:11,038][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>}
[2019-07-11T06:07:12,075][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2019-07-11T06:08:01,141][INFO ][org.mongodb.driver.cluster] Cluster created with settings {hosts=:*****], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
[2019-07-11T06:08:01,227][INFO ][org.mongodb.driver.connection] Opened connection [connectionId{localValue:1, serverValue:2954530}] to *:
[2019-07-11T06:08:01,230][INFO ][org.mongodb.driver.cluster] Monitor thread successfully connected to server with description ServerDescription{address=:, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 6, 4]}, minWireVersion=0, maxWireVersion=6, maxDocumentSize=16777216, roundTripTimeNanos=1218900}
[2019-07-11T06:08:01,664][INFO ][org.mongodb.driver.connection] Opened connection [connectionId{localValue:2, serverValue:2954531}] to*****:*****
[2019-07-11T06:08:01,901][INFO ][logstash.inputs.jdbc ] (0.496248s) db.DeviceServiceData.find()
[2019-07-11T06:09:00,260][INFO ][org.mongodb.driver.cluster] Cluster created with settings {hosts=[:], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
[2019-07-11T06:09:00,270][INFO ][org.mongodb.driver.connection] Opened connection [connectionId{localValue:3, serverValue:2954584}] to :
[2019-07-11T06:09:00,270][INFO ][org.mongodb.driver.cluster] Monitor thread successfully connected to server with description ServerDescription{address=:****, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 6, 4]}, minWireVersion=0, maxWireVersion=6, maxDocumentSize=16777216, roundTripTimeNanos=926300}
[2019-07-11T06:09:00,313][INFO ][org.mongodb.driver.connection] Opened connection [connectionId{localValue:4, serverValue:2954585}] to :
[2019-07-11T06:09:00,334][INFO ][logstash.inputs.jdbc ] (0.034389s) db.DeviceServiceData.find()
[2019-07-11T06:10:00,476][INFO ][org.mongodb.driver.cluster] Cluster created with settings {hosts=[:], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
[2019-07-11T06:10:00,485][INFO ][org.mongodb.driver.connection] Opened connection [connectionId{localValue:5, serverValue:2954640}] to :
[2019-07-11T06:10:00,486][INFO ][org.mongodb.driver.cluster] Monitor thread successfully connected to server with description ServerDescription{address=:****, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 6, 4]}, minWireVersion=0, maxWireVersion=6, maxDocumentSize=16777216, roundTripTimeNanos=836600}
[2019-07-11T06:10:00,504][INFO ][org.mongodb.driver.connection] Opened connection [connectionId{localValue:6, serverValue:2954641}] to :
[2019-07-11T06:10:00,521][INFO ][logstash.inputs.jdbc ] (0.025582s) db.DeviceServiceData.find()
</>