Jdbc plugin ships Repeated data from mongodb

Hi,

I am using logstash JDBC plugin to ship data from Mongo DB to elasticsearch. The data stored in the Mongo DB is in JSON documents. When logstash try to fetch the data every 1 min its getting repeated data. Total documents is DB is 20378 but my logstash is shipping all 20378 documents on every minute it is scheduled. Need immediate help. Below is the logstash configuration file.

</>
input {
jdbc {
jdbc_driver_library =>"C:/Users/home/Downloads/Backup/Downloads/mongo-java-driver-3.4.2.jar,C:/Users/home/Downloads/Backup/Downloads/mongojdbc1.2.jar"
jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
jdbc_connection_string => "jdbc:mongodb://***:***/*****"
jdbc_user => "admin"
schedule => "
/1 * * * *"
statement => "db.DeviceServiceData.find()"
last_run_metadata_path => "C:/Users/home/.logstash_jdbc_last_run"

}
}

output {
elasticsearch {
hosts => "localhost:9200"
index => "myindex"

}
stdout { codec => rubydebug }

}
</>

By running the above logstash configuration file am able to receive the data but it is repeated. every minute logstash goes and fetches the complete data in DB. I need only the updated. Below are the logs of logstash.

</>
[2019-07-11T06:07:01,719][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.1.1"}
[2019-07-11T06:07:10,202][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>, :added=>[http://localhost:9200/]}}
[2019-07-11T06:07:10,437][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2019-07-11T06:07:10,496][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>7}
[2019-07-11T06:07:10,500][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the type event field won't be used to determine the document _type {:es_version=>7}
[2019-07-11T06:07:10,527][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2019-07-11T06:07:10,541][INFO ][logstash.outputs.elasticsearch] Using default mapping template
[2019-07-11T06:07:10,563][INFO ][logstash.javapipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500, :thread=>"#<Thread:0x2dcff742 run>"}
[2019-07-11T06:07:10,748][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2019-07-11T06:07:10,879][INFO ][logstash.javapipeline ] Pipeline started {"pipeline.id"=>"main"}
[2019-07-11T06:07:11,038][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>}
[2019-07-11T06:07:12,075][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2019-07-11T06:08:01,141][INFO ][org.mongodb.driver.cluster] Cluster created with settings {hosts=:*****], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
[2019-07-11T06:08:01,227][INFO ][org.mongodb.driver.connection] Opened connection [connectionId{localValue:1, serverValue:2954530}] to *:
[2019-07-11T06:08:01,230][INFO ][org.mongodb.driver.cluster] Monitor thread successfully connected to server with description ServerDescription{address=
:
, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 6, 4]}, minWireVersion=0, maxWireVersion=6, maxDocumentSize=16777216, roundTripTimeNanos=1218900}
[2019-07-11T06:08:01,664][INFO ][org.mongodb.driver.connection] Opened connection [connectionId{localValue:2, serverValue:2954531}] to
*****:*****
[2019-07-11T06:08:01,901][INFO ][logstash.inputs.jdbc ] (0.496248s) db.DeviceServiceData.find()
[2019-07-11T06:09:00,260][INFO ][org.mongodb.driver.cluster] Cluster created with settings {hosts=[:], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
[2019-07-11T06:09:00,270][INFO ][org.mongodb.driver.connection] Opened connection [connectionId{localValue:3, serverValue:2954584}] to :
[2019-07-11T06:09:00,270][INFO ][org.mongodb.driver.cluster] Monitor thread successfully connected to server with description ServerDescription{address=:****, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 6, 4]}, minWireVersion=0, maxWireVersion=6, maxDocumentSize=16777216, roundTripTimeNanos=926300}
[2019-07-11T06:09:00,313][INFO ][org.mongodb.driver.connection] Opened connection [connectionId{localValue:4, serverValue:2954585}] to :
[2019-07-11T06:09:00,334][INFO ][logstash.inputs.jdbc ] (0.034389s) db.DeviceServiceData.find()
[2019-07-11T06:10:00,476][INFO ][org.mongodb.driver.cluster] Cluster created with settings {hosts=[
:], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
[2019-07-11T06:10:00,485][INFO ][org.mongodb.driver.connection] Opened connection [connectionId{localValue:5, serverValue:2954640}] to :
[2019-07-11T06:10:00,486][INFO ][org.mongodb.driver.cluster] Monitor thread successfully connected to server with description ServerDescription{address=
:****, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 6, 4]}, minWireVersion=0, maxWireVersion=6, maxDocumentSize=16777216, roundTripTimeNanos=836600}
[2019-07-11T06:10:00,504][INFO ][org.mongodb.driver.connection] Opened connection [connectionId{localValue:6, serverValue:2954641}] to :
[2019-07-11T06:10:00,521][INFO ][logstash.inputs.jdbc ] (0.025582s) db.DeviceServiceData.find()
</>

If you want the jdbc to maintain state then the query has to depend on sql_last_value. The documentation has an example using a simple SQL statement.

Thanks for reply Badger, I have gone through that documentation and I updated the last_run_metadata_path to /.logstash_jdbc_last_run that didn't solve my problem. Again am getting duplicate data and it's repeating. Toatl documents I have in my db is 20000 but every minute my logstash is fetching 20000.

Have you modified your stored procedure to filter the result set based on the value of sql_last_value?

Yes I have tried changing statement like below

Statement => from * db.subscriber_inventory.find() where sequenceid > sql_last_value

Even that didn't work

I do not do SQL beyond the most basic SELECTs, so I am unable to assist further.

My database is mongodb if can help me with this it would be great am really helpless running out of storage and many issues because of this repeated data. Thanks for your help.

As I said, I cannot help with the SQL.

It would be possible to use a fingerprint filter to create a document_id for the elasticsearch output. That would result in you overwriting the documents each time rather than creating new documents. Much less efficient than filtering in the SQL but would resolve part of the problem.

Hi Badger,

I have added documentid parameter in output and assigned it to unique value of my document like below in my configuration file.

output {

elasticsearch {

hosts => ["localhost:9200"]

index => "commands"

document_id => "%{messageid}"
}

stdout { codec => rubydebug }

}

Now what happend is

  1. It stop taking multiple documents but is every time when my logstash runs it is overwriting the existing documents which has the same unique document id which i had given. Time stamp is getting update every time as new timestamp. If it is like this i cant get the data for last 7 days or so.
  2. When ever my logstash configuration runs as those or the duplicate documents my document count is not increasing but the storage size of the index is increasing.

And also can you give the logstash configuration by adding fingerprint filter which you had suggested by making necessary changes to my configuration which i given.

I have no idea what your data looks like so I cannot suggest how to configure the fingerprint filter.

I really think you should focus on modifying the stored procedure so that it accepts :sql_last_value as a parameter and filters out older records.

I had setup logstash_last_run_path and it is getting updated with the lastrun timestamp as below
--- 2019-07-18 06:10:01.992880000 Z

But in my Document timestamp filed is in ISODATE format like below

2019-05-27T20:07:45.486Z

Can you help me in changing the ISODATE format to format of my timestamp field.

Can i change the default logstash timestamp to ISODATE format. If yes how can I change.

I do not know how to do that.

The timestamp stored by the logstash_jdbc_last_run is not an ISODATE format can anyone help me in changing the format of logstash_jdbc_last_run to ISODATE format.

1 Like

Hi Badger,

Thanks for your help. I found a solution for this repeated data. Am posting so it may be helpful to someone. I have changed the statement and written a conditional statement and I had used fingerprint to add new field with the value of timestamp in my data which is unique and incremental for every document.

input {
jdbc {
jdbc_driver_library =>"C:/Users/home/Downloads/Backup/Downloads/mongo-java-driver-3.4.2.jar,C:/Users/home/Downloads/Backup/Downloads/mongojdbc1.2.jar"
jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
jdbc_connection_string => "jdbc:mongodb://***:***/***** <em>"
jdbc_user => "admin"
schedule => "*/60 * * * *"
statement => "db.databasename.find({ timestampfield: { $gte: (:sql_last_value)}})"
last_run_metadata_path => "C:/Users/home/.logstash_jdbc_last_run"

}
}

filter {
      fingerprint {
        add_field => { "fingerprint" => "%{timestampfield}" }
      }
    }
output {
elasticsearch {
hosts => "localhost:9200"
index => "myindex"
}
stdout { codec => rubydebug }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.