I have configured SQS as input, not sure why I am getting this error. I am not performing any conversation from my Logstash config file.
Can anyone help to identify the issue?
The following error message comes right after Successfully started Logstash API endpoint
:
[2021-08-12T07:58:44,148][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2021-08-12T07:58:58,956][ERROR][logstash.javapipeline ][main] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"main", :error=>"(TypeError) no implicit conversion of nil into Integer", :exception=>Java::OrgJrubyExceptions::TypeError, :backtrace=>["usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:295)"], :thread=>"#<Thread:0x56308afb sleep>"}
[2021-08-12T07:58:58,998][WARN ][logstash.javapipeline ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:00,005][WARN ][logstash.javapipeline ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:01,008][WARN ][logstash.javapipeline ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:02,011][WARN ][logstash.javapipeline ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:03,013][WARN ][logstash.javapipeline ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:04,017][WARN ][logstash.javapipeline ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:05,020][WARN ][logstash.javapipeline ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:06,022][WARN ][logstash.javapipeline ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:07,025][WARN ][logstash.javapipeline ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:08,027][WARN ][logstash.javapipeline ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:20,004][ERROR][logstash.javapipeline ][main] Pipeline error {:pipeline_id=>"main", :exception=>#<RuntimeError: Unable to stop input plugin(s)>, :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:365:in `block in monitor_inputs_and_workers'", "org/jruby/RubyKernel.java:1442:in `loop'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:336:in `monitor_inputs_and_workers'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:195:in `run'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:137:in `block in start'"], "pipeline.sources"=>["/etc/logstash/conf.d/my_conf_file.conf"], :thread=>"#<Thread:0x56308afb run>"}
[2021-08-12T07:59:20,009][INFO ][logstash.javapipeline ][main] Pipeline terminated {"pipeline.id"=>"main"}
my config file :
input {
sqs {
queue => "my-queue"
id_field => "my-queue"
polling_frequency => 5
region => "ap-south-1"
threads => 4
access_key_id => "access_key"
secret_access_key => "secret_key"
}
}
filter
{
if "Warm" in [info][displayName] or "warm" in [info][displayName] or "WARM" in [info][displayName] or "Hot" in [info][displayName] or "HOT" in [info][displayName] or "hot" in [info][displayName]
{
http
{
follow_redirects => false
url => "http://my-endpoint-url/threshold?walletId=%{[info][walletId]}&coin=%{[coin]}"
verb => "GET"
}
if [confirmedBalanceUnitAmount] > [body][maxBalance]
{
mutate
{
add_field =>
{
"[violations][walletDisplayName]" => "%{[info][displayName]}"
"[violations][coin]" => "%{[coin]}"
"[violations][type]" => "thresholdLimit"
"[violations][reason]" => "balance exceeding threshold limit"
"[violations][violation]" => "violatedThresholdLimit"
"[violations][Priority]" => "CRITICAL"
"opsgenieAction" => "create"
"Signal" => "%{[info][displayName]} ( %{[coin]} ) wallet balance is exceeding threshold limit. Current balance: %{[confirmedBalanceUnitAmount]}."
"description" => "%{[Signal]}"
"message" => "The wallet policy %{[violations][type]} has been violated for wallet %{[violations][walletDisplayName]}."
}
}
}
}
if "Warm" in [info][displayName] or "warm" in [info][displayName] or "WARM" in [info][displayName] or "Hot" in [info][displayName] or "HOT" in [info][displayName] or "hot" in [info][displayName]
{
http
{
follow_redirects => false
url => "http://my-endpoint-url/threshold?walletId=%{[info][walletId]}&coin=%{[coin]}"
verb => "GET"
}
if [confirmedBalanceUnitAmount] < [body][minBalance]
{
mutate
{
add_field =>
{
"[violations][walletDisplayName]" => "%{[info][displayName]}"
"[violations][coin]" => "%{[coin]}"
"[violations][type]" => "thresholdLimit"
"[violations][reason]" => "low balance in wallet"
"[violations][violation]" => "violatedThresholdLimit"
"[violations][Priority]" => "CRITICAL"
"opsgenieAction" => "create"
"Signal" => "%{[info][displayName]} ( %{[coin]} ) wallet balance is lower than threshold limit. Current balance: %{[confirmedBalanceUnitAmount]}."
"description" => "%{[Signal]}"
"message" => "The wallet policy %{[violations][type]} has been violated for wallet %{[violations][walletDisplayName]}."
}
}
}
}
}
output {
# stdout { codec => rubydebug }
elasticsearch
{
codec => json
hosts => [ "ES_Host:9200" ]
index => "my-index-%{+YYYY.MM}"
ssl => true
ssl_certificate_verification => false
user => 'elastic'
password => "${es_pwd}"
}
}