Hello,
Currently, we are updating our data in Elasticsearch by uploading a lot of data once in a while from a relational database which uses quite some data. We want this this to be synchronized by uploading only new and edited records from this database, for example once every 10 minutes. For this we would like to use Logstash and JDBC. I am fairly new to Elasticsearch and especially new to Logstash, so I need some help.
"my index" should contain all the records that were uploaded after the previous tracking_column and the records that already existed. VOLGNR is in this case the specific id belonging to the record. I would like to assign VOLGNR to document_id which did not work during my multiple attempts. Next to that, we want to add inner joins. However, this results in an error, since it looks like it reads a count statement. How can I solve this?
[2019-10-21T09:09:46,310][ERROR][logstash.inputs.jdbc ][main] Java::JavaSql::SQLSyntaxErrorException: Dynamic SQL Error; SQL error code = -104; Token unknown - line 1, column 218; LIMIT [SQLState:42000, ISC error code:335544634]: SELECT count(*) AS "COUNT" FROM (SELECT * FROM OPBRNGST INNER JOIN KOPER ON OPBRNGST.KOPER = KOPER.KOPER_NR INNER JOIN PRODUKT ON OPBRNGST.PRODUKT = PRODUKT.PROD_NR WHERE VOLGNR > 1418300 ORDER BY VOLGNR ASC) AS "T1" LIMIT 1
From https://www.elastic.co/blog/how-to-keep-elasticsearch-synchronized-with-a-relational-database-using-logstash, we came up with this config file:
input {
jdbc {
jdbc_driver_library => "c:/bug/jaybird-full-3.0.6.jar"
jdbc_driver_class => "org.firebirdsql.jdbc.FBDriver"
jdbc_connection_string => "jdbc:firebirdsql://path"
jdbc_user => <my username>
jdbc_password => <my password>
jdbc_paging_enabled => false
jdbc_page_size => "50000"
jdbc_fetch_size => 50000
tracking_column => "VOLGNR"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
statement => "SELECT * FROM OPBRNGST INNER JOIN KOPER ON OPBRNGST.KOPER = KOPER.KOPER_NR INNER JOIN PRODUKT ON OPBRNGST.PRODUKT = PRODUKT.PROD_NR WHERE VOLGNR > 1418300 ORDER BY VOLGNR ASC"
}
}
filter {
mutate {
copy => { "VOLGNR" => "[@metadata][_id]"}
remove_field => ["@version"]
}
}
output {
elasticsearch {
hosts => <host>
user => <my username>
password => <my password>
index => <my index>
document_id => "%{[@metadata][_id]}"
}
}
Could you please help me?
Thanks!