Enforcing events to be outputted in order

I've a pipeline like this:

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/postgresql-42.2.6.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "${JDBC_CONNECTION_STRING}"
    jdbc_user => "${JDBC_USER}"
    jdbc_password => "${JDBC_PASSWORD}"
    statement =>
        # Select everything from the my_table and
        # add a fake record to indicate the end of the results.
        "SELECT * FROM my_table
         UNION ALL

filter {
  ruby {
    init => "
      require 'net/http'
      require 'json'
    code => '
       if event.get("some_key") == "END_OF_QUERY_RESULTS"
          uri = URI.parse(ENV["MY_URL"])
          response = Net::HTTP.get_response(uri)
          result = JSON.parse(response.body)

          if response.code == "202"
            puts "Success!"
            puts "ERROR: Couldn\'t start processing."


output {
  mongodb {
    bulk => true
    bulk_interval => 2
    collection => "${MONGO_DB_COLLECTION}"
    database => "${MONGO_DB_NAME}"
    generateId => true
    uri => "mongodb://${MONGO_DB_HOST}:${MONGO_DB_PORT}/${MONGO_DB_NAME}"

I simply grab all the data from a PostreSQL table to a MongoDB collection.

What I'm trying to achieve is: I want to call an API after loading ALL the data into MongoDB collection.

What I tried:

  • I tried the above approach to add a fake record at the end of the SQL query results to use as a flag to indicate the last event. The problem with this approach is LogStash does not maintain the order of events, hence, the event with 'END_OF_QUERY_RESULTS' string can become to the filter before it is actually the last one.
  • Setting pipeline.workers: 1 and pipeline.ordered: true, both don't seem to work.
  • I tried to sleep for a while in the Ruby filter and it works but I don't/can't really know how much time I should sleep.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.