Continuous Data from SQL Server into Logstash

I am using logstash to retrieve data from SQL into Elastic Search.

I run query data on logstash and index is created and able to visualize data in Kibana.

My concern is how to stream data always from SQL to logstash , whenever record is updated into SQL then that record should be updated into elasticsearch using logstash.

Is this possible

Hi , Any one has idea about above issue

The Logstash JDBC plugin retrieves data through queries and supports the use of a tracking column, which you can use to fetch only new or updated records assuming you can write a suitable query that uses this. If your data e.g. has an updated timestamp, you can use this tracking column to only select records with timestamps higher than when the query last ran, selecting only new or updated data.

Actually I have used below logstash.

But every time below file invoke add all indexes from initial , not taking new or updated records alone.

input {
jdbc {
jdbc_connection_string => "jdbc:sqlserver://XXXXX;databaseName=XXX;integratedSecurity=false;"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_user => "XXX"
jdbc_password => "XXXX"
jdbc_driver_library => "/path/enu/jre8/sqljdbc42.jar"
statement => "SELECT id,name FROM TestTable where id> :sql_last_value "
tracking_column => "id"
use_column_value => true
clean_run => true
last_run_metadata_path => "/path/.logstash_jdbc_last_run"
}
}

output {
elasticsearch {
hosts => "http://localhost:9200"
index => "testindex"
}
stdout{ }
}

Not sure what is missing,

Any help @Christian_Dahlqvist .

Use the "action => update" function in the elasticsearch output.

You do have clean_run set to true, which is likely to cause this.

hi mubarak_shaik,

can you please provide the correct code for this issue as i am facing the same problem.

thanks inadvance.

Hi @balumurari1 ,

input {
jdbc {
jdbc_connection_string => "jdbc:sqlserver://XXXXX;databaseName=XXXX;integratedSecurity=false;"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_user => "XXXX"
jdbc_password => "XXX"
jdbc_driver_library => "/path/jre8/sqljdbc42.jar"
statement => "SELECT id,name FROM sampletable where id> :sql_last_value "
tracking_column => "id"
use_column_value => true
last_run_metadata_path => "/path/.logstash_jdbc_last_run"
schedule => "*/5 * * * * *"
}
}

output {
elasticsearch {
hosts => "http://localhost:9200"
index => "sampleindex"
}
stdout{ }
}

Hello @ mubarak_shaik,
thanks a lot for your reply,

when i am running above code i observe that my logstash_jdbc_last_run file is getting empty.
I am confused why it gets empty.

the output console is as shown in the image,

The input code is as follows,

input {
jdbc {
jdbc_driver_library => "D:/Softwares/logstash/lib/com.mysql.jdbc_5.1.5.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/testdb"
jdbc_user => "root"
jdbc_password => "root"
statement => "SELECT id,name FROM sample where id > :sql_last_value"
tracking_column => "id"
use_column_value => true
last_run_metadata_path => "C:/Users/XXXXXX/.logstash_jdbc_last_run"
schedule => "*/1 * * * *"
}
}

output {
elasticsearch {
hosts => ["localhost:9200"]
index => "idx_mysqldata"
}
stdout { codec => rubydebug }

}

Please correct me if i am wrong.
Thanks in advance.

Hi ,

Please try with some ID Value first in where condition.

input {
jdbc {
jdbc_driver_library => "D:/Softwares/logstash/lib/com.mysql.jdbc_5.1.5.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/testdb"
jdbc_user => "root"
jdbc_password => "root"
statement => "SELECT id,name FROM sample where id > 1"
tracking_column => "id"
last_run_metadata_path => "C:/Users/XXXXXX/.logstash_jdbc_last_run"
schedule => "*/1 * * * *"
}
}

output {
elasticsearch {
hosts => ["localhost:9200"]
index => "idx_mysqldata"
}
stdout { codec => rubydebug }

}

Then check your results first.

Hi mubarak_shaik,

thanks for your response. It works as shown below for id>1
i have 3 records with id =1,2,3 in database
So for every 1 minute the files are continuously getting added into elasticsearch inadditional like
for first 1min = 2 hits with id = 2,3 (total hits:2)
for next 1min = 4 hits with id = 2,3,2,3(total hits:4)
for next 1min = 6 hits with id = 2,3,2,3,2,3(total hits:6)

Here, ES is getting filled up with duplicate records. but i need to check for any updates or add new records in database then it must reflect in elasticsearch

Please help me. thanks inadvance.

Hi @balumurari1 ,

Please delete .logstash_jdbc_last_run file. then follow below steps:

First Execute this one:

input {
jdbc {
jdbc_driver_library => "D:/Softwares/logstash/lib/com.mysql.jdbc_5.1.5.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/testdb"
jdbc_user => "root"
jdbc_password => "root"
statement => "SELECT id,name FROM sample where id > 1"
use_column_value => true
tracking_column => "id"
last_run_metadata_path => "C:/Users/XXXXXX/.logstash_jdbc_last_run"

}
}

output {
elasticsearch {
hosts => ["localhost:9200"]
index => "idx_mysqldata"
}
stdout { codec => rubydebug }

}

Now you have initial data , then now execute below config:

input {
jdbc {
jdbc_driver_library => "D:/Softwares/logstash/lib/com.mysql.jdbc_5.1.5.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/testdb"
jdbc_user => "root"
jdbc_password => "root"
statement => "SELECT id,name FROM sample where id > :sql_last_value"
use_column_value => true
tracking_column => "id"
last_run_metadata_path => "C:/Users/XXXXXX/.logstash_jdbc_last_run"
schedule => "*/5 * * * * *"

}
}

output {
elasticsearch {
hosts => ["localhost:9200"]
index => "idx_mysqldata"
}
stdout { codec => rubydebug }

}

hi mubarak_shaik,

Thanks a lot it worked fine now.
With this, we can insert any new rows inserted into database to the elasticsearch.

Can you please help me to know,
What is the scenario if we update the values from the existing rows, which must be updated in the ES aswell?
Ex:-
Table Name: Sample
columns(id,Name)
for id =1, we have name = 'Ram', in DB and in ES

suppose in database, i have updated name = 'Ramu' for id=1. How to reflect the change to ES.

Thanks in advance.

You need to have a column that you can use to identify the change, e.g. a modified timestamp, and then use this with the sql_last_value parameter.

1 Like

fine, thank you..!!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.