I've a pipeline like this:
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/postgresql-42.2.6.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "${JDBC_CONNECTION_STRING}"
jdbc_user => "${JDBC_USER}"
jdbc_password => "${JDBC_PASSWORD}"
statement =>
# Select everything from the my_table and
# add a fake record to indicate the end of the results.
"SELECT * FROM my_table
UNION ALL
SELECT 'END_OF_QUERY_RESULTS' AS my_key;
"
}
}
filter {
ruby {
init => "
require 'net/http'
require 'json'
"
code => '
if event.get("some_key") == "END_OF_QUERY_RESULTS"
uri = URI.parse(ENV["MY_URL"])
response = Net::HTTP.get_response(uri)
result = JSON.parse(response.body)
if response.code == "202"
puts "Success!"
else
puts "ERROR: Couldn\'t start processing."
end
event.cancel()
end
'
}
}
output {
mongodb {
bulk => true
bulk_interval => 2
collection => "${MONGO_DB_COLLECTION}"
database => "${MONGO_DB_NAME}"
generateId => true
uri => "mongodb://${MONGO_DB_HOST}:${MONGO_DB_PORT}/${MONGO_DB_NAME}"
}
}
I simply grab all the data from a PostreSQL table to a MongoDB collection.
What I'm trying to achieve is: I want to call an API after loading ALL the data into MongoDB collection.
What I tried:
- I tried the above approach to add a fake record at the end of the SQL query results to use as a flag to indicate the last event. The problem with this approach is LogStash does not maintain the order of events, hence, the event with 'END_OF_QUERY_RESULTS' string can become to the filter before it is actually the last one.
- Setting
pipeline.workers: 1
andpipeline.ordered: true
, both don't seem to work. - I tried to sleep for a while in the Ruby filter and it works but I don't/can't really know how much time I should sleep.