Logstash JDBC Input table data not in sync with Kibana Index


(Jatin) #1

Hello All,

I want an create a Dashboard with Visualization based on data from an Oracle Database.
Since i want it to be real time means if new row is inserted in the table the same to be reflected back in the index in Kibana I am using schedule tab with JDBC input plugin.

My config file is:-

input {
    jdbc {
        jdbc_connection_string => "jdbc:oracle:thin:hostname:port/XE"
        jdbc_user => "username"
	jdbc_password => "password"
        jdbc_driver_library => "/path/mybox/KibanaDbTry/ojdbc7.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
	#jdbc_validate_connection => true
	schedule => "* * * * *"
        statement => "SELECT * from thetable WHERE user_id > :sql_last_value ORDER by user_id"
	use_column_value => true
        tracking_column => user_id
    	tracking_column_type => "numeric"
    	clean_run => true
	record_last_run => true
	last_run_metadata_path => "/path/.logstash_jdbc_last_run"
    }

}
output {
    stdout { codec => json_lines }
	elasticsearch {
	index => "eeeeee"
	hosts => "http://localhost:9201"
	document_type => "schedules"
	document_id => "%{user_id}"
	}
}

The problem is during the first time the index appears same as that of SQL table but when I make changes in the table those are not reflected in the Kibana index/discover tab.
And in 2 cases in have observed the inserted columns are also removed from the Oracele database itself.

Table have 2 columns= Id and Username. Thats all.

Logs are :-

[2018-10-22T10:45:29,164][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/apps/tomcat/elk/ELK/logstash-6.2.4/modules/fb_apache/configuration"}
[2018-10-22T10:45:29,185][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/apps/tomcat/elk/ELK/logstash-6.2.4/modules/netflow/configuration"}
[2018-10-22T10:45:29,766][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-10-22T10:45:30,475][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.4"}
[2018-10-22T10:45:30,988][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9601}
[2018-10-22T10:45:33,724][WARN ][logstash.outputs.elasticsearch] You are using a deprecated config setting "document_type" set in elasticsearch. Deprecated settings will continue to work, but are scheduled for removal from logstash in the future. Document types are being deprecated in Elasticsearch 6.0, and removed entirely in 7.0. You should avoid this feature If you have any questions about this, please visit the #logstash channel on freenode irc. {:name=>"document_type", :plugin=><LogStash::Outputs::ElasticSearch index=>"eeeeee", hosts=>[http://10.23.213.99:9201], document_type=>"schedules", document_id=>"%{user_id}", id=>"3ae27aa2f81ef6b55d6ba82673fdd009084fa821576cdc2f39f5bb80bd994f19", enable_metric=>true, codec=><LogStash::Codecs::Plain id=>"plain_228763b2-647f-435e-9951-40326c5620b6", enable_metric=>true, charset=>"UTF-8">, workers=>1, manage_template=>true, template_name=>"logstash", template_overwrite=>false, doc_as_upsert=>false, script_type=>"inline", script_lang=>"painless", script_var_name=>"event", scripted_upsert=>false, retry_initial_interval=>2, retry_max_interval=>64, retry_on_conflict=>1, action=>"index", ssl_certificate_verification=>true, sniffing=>false, sniffing_delay=>5, timeout=>60, pool_max=>1000, pool_max_per_route=>100, resurrect_delay=>5, validate_after_inactivity=>10000, http_compression=>false>}
[2018-10-22T10:45:33,829][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5}
[2018-10-22T10:45:34,396][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://10.23.213.99:9201/]}}
[2018-10-22T10:45:34,410][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://10.23.213.99:9201/, :path=>"/"}
[2018-10-22T10:45:34,710][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://10.23.213.99:9201/"}
[2018-10-22T10:45:34,798][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6}
[2018-10-22T10:45:34,803][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the type event field won't be used to determine the document _type {:es_version=>6}
[2018-10-22T10:45:34,820][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2018-10-22T10:45:34,840][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"default"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2018-10-22T10:45:34,897][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://10.23.213.99:9201"]}
[2018-10-22T10:45:35,124][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x634438b5 sleep>"}
[2018-10-22T10:45:35,244][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
[2018-10-22T10:46:01,851][INFO ][logstash.inputs.jdbc ] (0.087609s) SELECT * from thetable WHERE user_id > 0 ORDER by user_id
[2018-10-22T10:47:00,262][INFO ][logstash.inputs.jdbc ] (0.005313s) SELECT * from thetable WHERE user_id > 2 ORDER by user_id
[2018-10-22T10:48:00,342][INFO ][logstash.inputs.jdbc ] (0.001482s) SELECT * from thetable WHERE user_id > 2 ORDER by user_id
[2018-10-22T10:49:01,159][INFO ][logstash.inputs.jdbc ] (0.014202s) SELECT * from thetable WHERE user_id > 2 ORDER by user_id
.
.
.
.
and it goes on and on like the above

The table have only 2 rows at start but when i add more rows not only they do not appear in Kibana index, sometimes they are removed from the table itself.

I am relatively new to ELK stack will be really thankful if someone help me.


#2

Each minute, you see one of those lines on the commandline:

[2018-10-22T10:49:01,159][INFO ][logstash.inputs.jdbc ] (0.014202s) SELECT * from thetable WHERE user_id > 2 ORDER by user_id

So it seems logstash is checking for new records.
Does it pick up new database records when you insert new ones?
When do the records get deleted? Is it when logstash is checking for new records? I doubt it.
Maybe something else is deleting records before they get picked up?

It should not affect your current problem, but fix this warning before you use this in production:

[2018-10-22T10:45:33,724][WARN ][logstash.outputs.elasticsearch] You are using a deprecated config setting "document_type" set in elasticsearch. Deprecated settings will continue to work, but are scheduled for removal from logstash in the future. Document types are being deprecated in Elasticsearch 6.0, and removed entirely in 7.0. You should avoid this feature If you have any questions about this, please visit the #logstash channel on freenode irc. {:name=>"document_type", :plugin=><LogStash::Outputs::ElasticSearch index=>"eeeeee", hosts=>[http://10.23.213.99:9201], document_type=>"schedules", document_id=>"%{user_id}", id=>"3ae27aa2f81ef6b55d6ba82673fdd009084fa821576cdc2f39f5bb80bd994f19", enable_metric=>true, codec=><LogStash::Codecs::Plain id=>"plain_228763b2-647f-435e-9951-40326c5620b6", enable_metric=>true, charset=>"UTF-8">, workers=>1, manage_template=>true, template_name=>"logstash", template_overwrite=>false, doc_as_upsert=>false, script_type=>"inline", script_lang=>"painless", script_var_name=>"event", scripted_upsert=>false, retry_initial_interval=>2, retry_max_interval=>64, retry_on_conflict=>1, action=>"index", ssl_certificate_verification=>true, sniffing=>false, sniffing_delay=>5, timeout=>60, pool_max=>1000, pool_max_per_route=>100, resurrect_delay=>5, validate_after_inactivity=>10000, http_compression=>false>}

(Jatin) #3

Hello. thanksfor your replye.

No it does not view new records. Every time same console line appears. if I have inserted 2 new records the value should have been 4 because it should have been the last row. Nor does in Kibana.

I think they get deleted after i stop logstash from my VM and restart by changing the index name or something else to try in the original logstash.conf file. But the database table remains the same.

Regarding the warning. I did google it and they are saying in current version it should not bother. So i will hold on to it for a while.

Moreover, I stopped everything and tried running it again. And now the script is showing 0 instead of 2 in the logs of the logstash. And the worse part is the index does not appear in ElasticSearch or Kibana. So I dont even have the data which runs for the first time as well. :frowning:


(Jatin) #4

To add to the moreover part. I mentioned that ran the same log.conf by changing the index name and document_type right, whereas the rest remains the same.
I just checked that the "mytable" dummy table is empty now. It had 3 rows all gone.
Also, I cannot even drop that table it says "Interested source is busy".

Please help me find what is happening.


#5

Why have you specified the output stdout { codec => json_lines }? You do not have json. Try rubydebug

I tried a minimal working example myself and it is working fine.
I did my test with sql server. I do not have an Oracle DB laying around.

DB setup:

CREATE TABLE [dbo].[thetable](
    [user_id] [int] IDENTITY(1,1) NOT NULL,
    [text] [nvarchar](50) NULL
) ON [PRIMARY]
INSERT INTO [dbo].[thetable]([text]) VALUES('a')
INSERT INTO [dbo].[thetable]([text]) VALUES('b')
INSERT INTO [dbo].[thetable]([text]) VALUES('c')

Logstash conf:

input {
    jdbc {
        jdbc_driver_library => "C:\ProgramFiles\ms-jdbc-driver-6.0\enu\jre8\sqljdbc42.jar"
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        jdbc_connection_string => "jdbc:sqlserver://sql-server\inst:56000"
        jdbc_user => "user"
        jdbc_password => "password"
        schedule => "* * * * *"
        statement => "SELECT * from [dbo].[thetable] WHERE user_id > :sql_last_value ORDER by user_id"
        use_column_value => true
        tracking_column => user_id
        tracking_column_type => "numeric"
        clean_run => true
        record_last_run => true
    }
}
output {
	stdout {
		codec => rubydebug
	}
}

Output is:

(0.049986s) SELECT * from [dbo].[thetable] WHERE user_id > 0 ORDER by user_id
{
      "@version" => "1",
       "user_id" => 1,
          "text" => "a",
    "@timestamp" => 2018-10-23T07:49:03.136Z
}
{
      "@version" => "1",
       "user_id" => 2,
          "text" => "b",
    "@timestamp" => 2018-10-23T07:49:03.145Z
}
{
      "@version" => "1",
       "user_id" => 3,
          "text" => "c",
    "@timestamp" => 2018-10-23T07:49:03.145Z
}

When I insert 2 new records, I get:

(0.001570s) SELECT * from [dbo].[thetable] WHERE user_id > 3 ORDER by user_id
{
      "@version" => "1",
       "user_id" => 4,
          "text" => "d",
    "@timestamp" => 2018-10-23T07:50:00.207Z
}
{
      "@version" => "1",
       "user_id" => 5,
          "text" => "e",
    "@timestamp" => 2018-10-23T07:50:00.207Z
}

The only thing I omitted is the last_run_metadata_path => "/path/.logstash_jdbc_last_run"part.
It seems your config is basically working.
Try removing the last_run_metadata_path, removing all outputs and only use the rubydebugone, and expand from there.
What ever is deleting your records, I expect it to be something else than Logstash.


(Jatin) #6

Hi redX,

Thank you for your answer. i actually tried exactly what you mentioned.
I commented the last run metadata parameter and changed codec to rubydebug.
Changed the index name and document_type name.
But unfortunately it is the still same actually. :cry:

This is the config I ran:-

Input{
jdbc{
jdbc_connection_string => "jdbc:{oracle:thin:@host:port/XE"
jdbc_user => "user"
jdbc_password => "pass"
jdbc_driver_library => "/path/JatinSandbox/KibanaDbTry/ojdbc7.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
#jdbc_validate_connection => true
#statement => "select * from mytable"
statement => "SELECT * from mtable WHERE user_id > :sql_last_value ORDER by user_id"
schedule => "* * * * *"
use_column_value => true
tracking_column => "user_id"
tracking_column_type => "numeric"
clean_run => true
record_last_run => true
#last_run_metadata_path => "/home/toms/.logstash_jdbc_last_run"
}

}
output {
stdout{
codec => rubydebug
}
elasticsearch {
index => "batchdatas"
hosts => "hostname"
document_type => "schedulemine"
document_id => "%{user_id}"
}

}

The output is same. Logstash try to run the statement again and again with 0 as last sql run value. And the index is not listed in ElasticSearch or Kibana

[2018-10-23T08:40:08,802][INFO ][logstash.inputs.jdbc     ] (0.072629s) SELECT * from mtable WHERE user_id > 0 ORDER by user_id
[2018-10-23T08:41:00,160][INFO ][logstash.inputs.jdbc     ] (0.002166s) SELECT * from mtable WHERE user_id > 0 ORDER by user_id
[2018-10-23T08:42:00,348][INFO ][logstash.inputs.jdbc     ] (0.001678s) SELECT * from mtable WHERE user_id > 0 ORDER by user_id

#7

Please give me your database table structure and some insert statement to reproduce the problem.

If you execute the query SELECT * from mtable WHERE user_id > 0 ORDER by user_id manually against your database, do you get records?


(Jatin) #8

Its simple actually. I am also trying this out before i implement on real table. Dont want real data to be deleted.

Create table mytable(user_id int, names varchar(20));

and then few run first 3 statements where I run first 3 at first and then rest 2 after i run logstash file to confirm schedule does it job.

insert into mytable(user_id, names) values(1,'sham');
insert into mytable(user_id, names) values(2,'shama');
insert into mytable(user_id, names) values(3,'shamb');
insert into mytable(user_id, names) values(4,'shamc');
insert into mytable(user_id, names) values(5,'shamd');

And about the statement parameter in Config file. I did get the result on friday where the first 3 records were present not the remaining 2 which i inserted after running the file.
But since yesterday I am unable to see even the index in ES.


#9

If you were using Oracle DB to load the data to ES, check below code,

To load the changes occurring in the databse use schedule => "*/2 * * * *",
schedule for every 2 min


(system) #10

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.