Sequential record pull from JDBC to Elasticsearch using Logstash is out of the sequence

Hello All,

I am facing an issue while pulling data from SQL Database. It appears that the sequential document population which is based on an ID is sometimes out of the sequence... Let me explain it on an actual example.
My dataset contains around 25k of records. It has transnational data with transaction date as a tracking column. I'm using fingerprint to indicate the _id which is overwritten once the particular set of columns will occur. So here's the sample of my set:

Transaction Date Status Area City Street Building
01.01.2019 10:00 NEW Manhattan New York Columbus Ave 609
04.04.2019 12:00 DELETE Manhattan New York Columbus Ave 609

For fingerprint, I'm using the combination of those columns: Area, City, Street, Building and this is what i have in my _id field.

Query:

SELECT
Transaction_date, Status, Area, City, Stree, Building
FROM
Buildings
WHERE Transaction_date >= :sql_last_value
ORDER BY Transaction_date ASC

I assume with this logic, only the latest record with 'DELETE' status should be visible in Elasticsearch. Yet, that's not the case. I tried to reload the entire set few times, and it's almost always 'NEW' that persists in my index. I tried to narrow it down to just those two using my query, and removed the tracking_column setup to see what happens if Logstash will reload the entire set each time, and the result is also mostrly 'NEW'. Sometimes 'DELETE' persist but usually after next pipieline run, it get's back to 'DELETE'.

Here's my config:

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/lib/drivers/sqljdbc42.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "jdbc:sqlserver://sql:1433"
    jdbc_user => "user"
    jdbc_password => "password"
    schedule => "*/5 * * * *"
	statement_filepath => "/etc/logstash/conf.d/queries/buildings.sql"
	use_column_value => true
	tracking_column_type => "timestamp"
    tracking_column => "transaction_date"
	last_run_metadata_path => "/usr/share/logstash/last_run_metadata/.buildings"
	record_last_run => true
  }
}

filter {
	mutate {
		add_field => {"fingerprint" => "%{area}_%{city}_%{street}_%{building}"}
	}
	fingerprint {
		source => "fingerprint"
		target => "[@metadata][fingerprint]"
		method => "MURMUR3"
}
}

output {
	elasticsearch {
		hosts => ["elastic:9200"]
		index => "buildings"
		document_id => "%{[@metadata][fingerprint]}"
	}
}

Are you using a '--pipeline.workers 1' if not, you will definitely get data out of order. Even with that I do not think order is guaranteed.

Hello @Badger,

Thanks for the suggestion! I've been using default value (which was 125). I changed it to 1, checked few cases and it seems like it properly sequenced the records. I will have to conduct more detailed tests early next week.
As a additional question, is there any mechanism or best practice that might ensure a proper sequencing during 'first load' of data?

--pipeline.workers 1 is all there is.

Got it, thank you.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.