Load data from SQL server to Elasticsearch on local

I have local sql server and local ELK . I configed logstash.conf connect to SQL Server. The logstash have read data from SQL but when write data to Elasticsearch only 1 row.
Please any one explain to me about this.
input {
jdbc {
jdbc_driver_library => "C:\ELK\elasticsearch-7.12.0-windows-x86_64\elasticsearch-7.12.0\lib\sqljdbc_9.2\enu\mssql-jdbc-9.2.1.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string =>"jdbc:sqlserver://BPHHP13:1433;databaseName=WRIPMS;"
jdbc_user => "test"
jdbc_password => "TuanTu2017@)!&"
statement => "select countyId, countyName, modifiedDate from Counties"
}
}
output {
elasticsearch{
hosts => "http://localhost:9200/"
index => "counties_index"
document_id => "%{CountyId}"
}
stdout{}
}
Thank you.

what is the output of

GET _cat/indices/?v

Also you are setting the document _id to the CountyId that may not be the best approach, unless you are specifically trying to overwrite the documents, if so you you might want to use doc_as_upsert see here

Hi @stephenb .
I need insert the new rows also overwrite data in Elastic Search so I used
document_id => "%{CountyId}".
So I will set up 'doc_as_upsert' is true?
Please you support to me about this case.

This is not a support case we are volunteers. We will help if we can, the more detail you provide the better chance someone can help.

Hi @stephenb .
I configured doc_as_upsert => true in logstash.cnf. But it doesn't work. My source tabe has 1900 rows. But just only 1 row has been written to Elastic search.

input {
jdbc {
jdbc_driver_library => "C:\ELK\elasticsearch-7.12.0-windows-x86_64\elasticsearch-7.12.0\lib\sqljdbc_9.2\enu\mssql-jdbc-9.2.1.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string =>"jdbc:sqlserver://BPHHP13:1433;databaseName=WRIPMS;"
jdbc_user => "test"
jdbc_password => "TuanTu2017@)!&"
jdbc_paging_enabled => true
schedule => "*/5 * * * * *"
statement => "select countyId, countyName, modifiedDate from Counties where countyId > :sql_last_value"
use_column_value => true
tracking_column => "countyId"
}
}
output {
elasticsearch{
hosts => "http://localhost:9200/"
index => "counties_index"
document_id => "%{CountyId}"
doc_as_upsert => true
}
stdout {
codec => json_lines
}
}
Thank you.

Field names are case sensitive. You have a column called [countyId], not [CountyId], so the document_id will literally be %{CountyId} and every document will overwrite it.

1 Like

Hi @Badger .
I modified the config file follow your guideline. But it still import 1 row into elastic search.
Please you help me review it.
input {
jdbc {
jdbc_driver_library => "C:\ELK\elasticsearch-7.12.0-windows-x86_64\elasticsearch-7.12.0\lib\sqljdbc_9.2\enu\mssql-jdbc-9.2.1.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string =>"jdbc:sqlserver://BPHHP13:1433;databaseName=WRIPMS;"
jdbc_user => "test"
jdbc_password => "TuanTu2017@)!&"
jdbc_paging_enabled => true
schedule => "*/5 * * * * *"
statement => "select countyId, countyName, modifiedDate from Counties where countyId > :sql_last_value"
use_column_value => true
tracking_column => "countyId"
}
}
output {
elasticsearch{
hosts => "http://localhost:9200/"
index => "counties_index"
document_id => "%{countyId}"
doc_as_upsert => true
}
stdout {
codec => json_lines
}
}
Thank you.

You are using a tracking_column, so only new data will be added. Try running once with clean_run => true.

Hi @Badger .
I am try running clean_run => true. It still doesn't work. But when I remove 'document_id' so data full fill to elastic search. My table has 1987 rows.
What I need when Insert new record, only just insert row which not exists in Elastic.
This is my file config.
input {
jdbc {
jdbc_driver_library => "C:\ELK\elasticsearch-7.12.0-windows-x86_64\elasticsearch-7.12.0\lib\sqljdbc_9.2\enu\mssql-jdbc-9.2.1.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string =>"jdbc:sqlserver://BPHHP13:1433;databaseName=WRIPMS;"
jdbc_user => "test"
jdbc_password => "TuanTu2017@)!&"
jdbc_paging_enabled => true
clean_run => true
schedule => "*/5 * * * * *"
statement => "select [countyId], [countyName], [modifiedDate] from Counties where [countyId] > :sql_last_value"
use_column_value => true
tracking_column => "countyId"
}
}
output {
elasticsearch{
hosts => "http://localhost:9200/"
index => "counties_index"
document_id => "%{countyId}"
doc_as_upsert => true
}
stdout {
codec => rubydebug
}
}
Thank you.

As noted in another question you asked about this, because the lowercase_column_names defaults to true, you do not have a field called countId, but instead countyid.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.