Run pipeline based on condition in another pipeline

Need to import new data from Oracle (as soon as they start batch processing) couple of times a day.

E.g., my dba team are doing some batch processes 5 times a day (random time period and number of rows (up to 1 million)). They don't want me to run select on db (named TRANSACTIONS) every 5 minutes, but only when batch processing started. That's why they created another table in Oracle (named SIGNAL) with "notification" for me to start my logstash pipeline to start pulling data from TRANSACTIONS to ES.

When they start batch process (on table TRANSACTIONS), they add in table SIGNAL new row "TIMESTAMP - BATCH STARTED".

My plan is to run logstash pipeline on table SIGNAL every minute and check if new row contains "BATCH STARTED" string was created, if it is, then I should run new pipeline on table TRANSACTIONS. Is that possible in Logstash?

This is how my query for table TRANSACTIONS looks like:

    input {

        jdbc {
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        jdbc_connection_string => "jdbc:oracle:thin:@db12db:1543:db12"
        jdbc_user => "XXXXX"
        jdbc_password => "XXXXX"
        use_column_value => true
        tracking_column => "order_id"
        #tracking_column_type => "numeric"
        #clean_run => true
        record_last_run => true
        schedule => "* * * * *"
        last_run_metadata_path => "/etc/logstash/lastrun_transactions"
        statement => "select p.order_id,to_char(P.date_value, 'yyyy-mm-dd') datum_valute,to_char(P.date_booked, 'yyyy-mm-dd') date_booked......
     WHERE order_id> :sql_last_value ORDER BY order_id"
      }
    }   `# Can I add here another input on another table?`

     output {
             elasticsearch {
                hosts => ["https://localhost:9200"]
                user => "XXX"
                password => "XXX"
                index => "transactions__%{[@metadata][index_year]}"
                document_id => "%{order_id}"
                ssl => true
                cacert => '/etc/logstash/XXXRootCAv4.cer.pem'
            }
    }

You cannot trigger an input based on another input, but you could trigger an jdbc_streaming filter based on the event generated by an input. It's not a great fit for the use case, but you might be able to get it to work.

P.S. Do you want to trigger on batch started or batch ended?

I want to trigger when batch started

OK, but that seems likely to lead to race conditions where you are trying to fetch rows before the batch processing has finished updating them, so you will not fetch them until the next day.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.