Is it possible to add condition in elasticsearch logstash jdbc input plugin

Hi Experts,

Is it possible to add condition in elasticsearch logstash jdbc input plugin for the use case mentioned below -

Usecase -

  • Select data from table1
  • If datetime (refer elapseTime in code mentioned below) fetched from table1 is less than current dateTime
  • Select data from table2
input{
  jdbc {
    id => "test"
    jdbc_driver_library => "mssql-jdbc-7.4.1.jre8.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "jdbc:sqlserver://test:3333"
    jdbc_user => "test"
    jdbc_password => "test"
    schedule => "* * * * *"
    jdbc_default_timezone => "IST"
    statement => "
        SELECT  last_started,next_start_time
                DATEDIFF(s,next_start_time,last_started) elapseTime
                FROM table1 with (nolock)               
      "
  }
   if [jobElapseTime] > get.currentDatetime
    jdbc {
    id => "test"
    jdbc_driver_library => "mssql-jdbc-7.4.1.jre8.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "jdbc:sqlserver://test:3333"
    jdbc_user => "test"
    jdbc_password => "test"
    schedule => "* * * * *"
    jdbc_default_timezone => "IST"
    statement => "
        SELECT  last_started,next_start_time
                DATEDIFF(s,next_start_time,last_started) jobElapseTime
                FROM table2 with (nolock)               
      "
  }
}

Thanks
Nivedita

You cannot reference fields from events in the input section because no events exist when this is configured. You can use a conditional in the filter section and call a jdbc_streaming filter to query table2.

Thanks Badger , I will try the above mentioned solution.

Hi @Badger ,

As advised above, I tried jdbc_streaming filter to query table 2 ; however I am getting below error while testing the pipeline -

ERROR

[ERROR] 2022-11-21 14:11:04.285 [[main]>worker0] javapipeline - Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"main", :error=>"(NoMethodError) undefined method `pop' for nil:NilClass", :exception=>Java::OrgJrubyExceptions::NoMethodError, :backtrace=>["usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.amazing_print_minus_1_dot_4_dot_0.lib.amazing_print.inspector.awesome(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/amazing_print-1.4.0/lib/amazing_print/inspector.rb:93)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.amazing_print_minus_1_dot_4_dot_0.lib.amazing_print.core_ext.kernel.ai(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/amazing_print-1.4.0/lib/amazing_print/core_ext/kernel.rb:11)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_rubydebug_minus_3_dot_1_dot_0.lib.logstash.codecs.rubydebug.encode_default(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-rubydebug-3.1.0/lib/logstash/codecs/rubydebug.rb:38)", "org.jruby.RubyMethod.call(org/jruby/RubyMethod.java:119)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_rubydebug_minus_3_dot_1_dot_0.lib.logstash.codecs.rubydebug.encode(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-rubydebug-3.1.0/lib/logstash/codecs/rubydebug.rb:34)", "usr.share.logstash.logstash_minus_core.lib.logstash.codecs.base.multi_encode(/usr/share/logstash/logstash-core/lib/logstash/codecs/base.rb:64)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1821)", "usr.share.logstash.logstash_minus_core.lib.logstash.codecs.base.multi_encode(/usr/share/logstash/logstash-core/lib/logstash/codecs/base.rb:64)", "usr.share.logstash.logstash_minus_core.lib.logstash.codecs.delegator.multi_encode(/usr/share/logstash/logstash-core/lib/logstash/codecs/delegator.rb:55)", "org.logstash.instrument.metrics.AbstractSimpleMetricExt.time(org/logstash/instrument/metrics/AbstractSimpleMetricExt.java:65)", "org.logstash.instrument.metrics.AbstractNamespacedMetricExt.time(org/logstash/instrument/metrics/AbstractNamespacedMetricExt.java:64)", "usr.share.logstash.logstash_minus_core.lib.logstash.codecs.delegator.multi_encode(/usr/share/logstash/logstash-core/lib/logstash/codecs/delegator.rb:54)", "usr.share.logstash.logstash_minus_core.lib.logstash.outputs.base.multi_receive(/usr/share/logstash/logstash-core/lib/logstash/outputs/base.rb:103)", "org.logstash.config.ir.compiler.OutputStrategyExt$AbstractOutputStrategyExt.multi_receive(org/logstash/config/ir/compiler/OutputStrategyExt.java:143)", "org.logstash.config.ir.compiler.AbstractOutputDelegatorExt.multi_receive(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:121)", "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:299)"], :thread=>"#<Thread:0x28230c91 sleep>"}

CODE

   jdbc_streaming {
    jdbc_driver_library => "/etc/logstash/drivers/jdbc/sqlServer/mssql-jdbc-7.4.1.jre8.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "jdbc:sqlserver://****:****"
    jdbc_user => "dummy"
    jdbc_password => "dummy"
    statement =>"SELECT id , updated  , started , getdate() AS compareDateTime
                 FROM [table01] with (nolock) WHERE  compareDateTime > :nextstarttime"
    parameters => { "nextstarttime" => "next_start_time"}
    target => "details"
}

Could you please assist me with rror=>"(NoMethodError) undefined method `pop' for nil:NilClass ? I tried different SQL datetime methods for comaprision , but none of then are working.

Thanks in advance !
Niveidta

That is occurring (here) in the rubydebug codec of an output. It has nothing to do with the jdbc_streaming filter.

Thanks @Badger, I am just printing the output in the pipeline. Do I need define the codec in the output?

output {
 stdout { }
 }

The default codec for a stdout output is rubydebug, you could try changing that to json to get around this bug.

Thanks @Badger ,

The error "undefined method `pop' for nil:NilClass " is gone by defining the codec as json; however now I am getting new RubyDateTime missing Converter handling error.
The query is working fine with sql server; hence I am not sure which date Time fields needs converter.

[ERROR] 2022-11-22 09:35:33.734 [[main]>worker1] javapipeline - Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"main", :error=>"Missing Converter handling for full class name=org.jruby.ext.date.RubyDateTime, simple name=RubyDateTime", :exception=>Java::OrgLogstash::MissingConverterException, :backtrace=>["org.logstash.Valuefier.fallbackConvert(Valuefier.java:118)", "org.logstash.Valuefier.convert(Valuefier.java:96)"
statement =>"SELECT id , updated  , started 
                        FROM [table01] with (nolock) 
                         WHERE  getdate() > :nextstarttime"

``