Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"main", :error=>"(TypeError) no implicit conversion of nil into Integer

I have configured SQS as input, not sure why I am getting this error. I am not performing any conversation from my Logstash config file.

Can anyone help to identify the issue?

The following error message comes right after Successfully started Logstash API endpoint:

[2021-08-12T07:58:44,148][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2021-08-12T07:58:58,956][ERROR][logstash.javapipeline    ][main] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"main", :error=>"(TypeError) no implicit conversion of nil into Integer", :exception=>Java::OrgJrubyExceptions::TypeError, :backtrace=>["usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:295)"], :thread=>"#<Thread:0x56308afb sleep>"}
[2021-08-12T07:58:58,998][WARN ][logstash.javapipeline    ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:00,005][WARN ][logstash.javapipeline    ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:01,008][WARN ][logstash.javapipeline    ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:02,011][WARN ][logstash.javapipeline    ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:03,013][WARN ][logstash.javapipeline    ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:04,017][WARN ][logstash.javapipeline    ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:05,020][WARN ][logstash.javapipeline    ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:06,022][WARN ][logstash.javapipeline    ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:07,025][WARN ][logstash.javapipeline    ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:08,027][WARN ][logstash.javapipeline    ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:20,004][ERROR][logstash.javapipeline    ][main] Pipeline error {:pipeline_id=>"main", :exception=>#<RuntimeError: Unable to stop input plugin(s)>, :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:365:in `block in monitor_inputs_and_workers'", "org/jruby/RubyKernel.java:1442:in `loop'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:336:in `monitor_inputs_and_workers'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:195:in `run'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:137:in `block in start'"], "pipeline.sources"=>["/etc/logstash/conf.d/my_conf_file.conf"], :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:20,009][INFO ][logstash.javapipeline    ][main] Pipeline terminated {"pipeline.id"=>"main"}

my config file :

input {
sqs { 
	queue => "my-queue" 
	id_field => "my-queue" 
	polling_frequency => 5 
	region => "ap-south-1" 
	threads => 4
	access_key_id => "access_key"
	secret_access_key => "secret_key"
}
}
filter
{   
    if "Warm" in [info][displayName] or "warm" in [info][displayName] or "WARM" in [info][displayName] or "Hot" in [info][displayName] or "HOT" in [info][displayName] or "hot" in [info][displayName]
    {
                http
		  {
		  follow_redirects => false
		  url => "http://my-endpoint-url/threshold?walletId=%{[info][walletId]}&coin=%{[coin]}"
		  verb => "GET"
		  }

      if [confirmedBalanceUnitAmount] > [body][maxBalance]
      {
        mutate
        {
        add_field =>
        {
        "[violations][walletDisplayName]" => "%{[info][displayName]}"
        "[violations][coin]" => "%{[coin]}"
        "[violations][type]" => "thresholdLimit"
        "[violations][reason]" => "balance exceeding threshold limit"
        "[violations][violation]" => "violatedThresholdLimit"
        "[violations][Priority]" => "CRITICAL"
        "opsgenieAction" => "create"
        "Signal" => "%{[info][displayName]} ( %{[coin]} ) wallet balance is exceeding threshold limit. Current balance: %{[confirmedBalanceUnitAmount]}."
        "description" => "%{[Signal]}"
        "message" => "The wallet policy %{[violations][type]} has been violated for wallet %{[violations][walletDisplayName]}."
        }
        }
      }
    }   
    if "Warm" in [info][displayName] or "warm" in [info][displayName] or "WARM" in [info][displayName] or "Hot" in [info][displayName] or "HOT" in [info][displayName] or "hot" in [info][displayName]
    {
                http
		  {
		  follow_redirects => false
		  url => "http://my-endpoint-url/threshold?walletId=%{[info][walletId]}&coin=%{[coin]}"
		  verb => "GET"
		  }
      if [confirmedBalanceUnitAmount] < [body][minBalance]
      {
        mutate
        {
        add_field =>
        {
        "[violations][walletDisplayName]" => "%{[info][displayName]}"
        "[violations][coin]" => "%{[coin]}"
        "[violations][type]" => "thresholdLimit"
        "[violations][reason]" => "low balance in wallet"
        "[violations][violation]" => "violatedThresholdLimit"
        "[violations][Priority]" => "CRITICAL"
        "opsgenieAction" => "create"
        "Signal" => "%{[info][displayName]} ( %{[coin]} ) wallet balance is lower than threshold limit. Current balance: %{[confirmedBalanceUnitAmount]}."
        "description" => "%{[Signal]}"
        "message" => "The wallet policy %{[violations][type]} has been violated for wallet %{[violations][walletDisplayName]}."
        }        
        }    
      }
    }
}
output {
#        stdout { codec => rubydebug }
        
        elasticsearch 
        {
        codec => json
        hosts => [ "ES_Host:9200" ]
        index => "my-index-%{+YYYY.MM}"
        ssl => true
        ssl_certificate_verification => false
        user => 'elastic'
        password => "${es_pwd}"
	    }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.