Hello All,
I am facing an issue while pulling data from SQL Database. It appears that the sequential document population which is based on an ID is sometimes out of the sequence... Let me explain it on an actual example.
My dataset contains around 25k of records. It has transnational data with transaction date as a tracking column. I'm using fingerprint to indicate the _id which is overwritten once the particular set of columns will occur. So here's the sample of my set:
Transaction Date | Status | Area | City | Street | Building |
---|---|---|---|---|---|
01.01.2019 10:00 | NEW | Manhattan | New York | Columbus Ave | 609 |
04.04.2019 12:00 | DELETE | Manhattan | New York | Columbus Ave | 609 |
For fingerprint, I'm using the combination of those columns: Area, City, Street, Building and this is what i have in my _id field.
Query:
SELECT
Transaction_date, Status, Area, City, Stree, Building
FROM
Buildings
WHERE Transaction_date >= :sql_last_value
ORDER BY Transaction_date ASC
I assume with this logic, only the latest record with 'DELETE' status should be visible in Elasticsearch. Yet, that's not the case. I tried to reload the entire set few times, and it's almost always 'NEW' that persists in my index. I tried to narrow it down to just those two using my query, and removed the tracking_column setup to see what happens if Logstash will reload the entire set each time, and the result is also mostrly 'NEW'. Sometimes 'DELETE' persist but usually after next pipieline run, it get's back to 'DELETE'.
Here's my config:
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/lib/drivers/sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://sql:1433"
jdbc_user => "user"
jdbc_password => "password"
schedule => "*/5 * * * *"
statement_filepath => "/etc/logstash/conf.d/queries/buildings.sql"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "transaction_date"
last_run_metadata_path => "/usr/share/logstash/last_run_metadata/.buildings"
record_last_run => true
}
}
filter {
mutate {
add_field => {"fingerprint" => "%{area}_%{city}_%{street}_%{building}"}
}
fingerprint {
source => "fingerprint"
target => "[@metadata][fingerprint]"
method => "MURMUR3"
}
}
output {
elasticsearch {
hosts => ["elastic:9200"]
index => "buildings"
document_id => "%{[@metadata][fingerprint]}"
}
}