Hi there,
I have a workflow with two Logstash's and a rabbitmq in the middle to store messages temporarily.
On the second logstash, the one behind the rabbitmq, I'm using the following code to set a field with the time the message reached the logstash:
filter {
ruby {
code => "event.set('process_time',Time.now.utc.strftime('%FT%T.%LZ'))"
}
}
After some time It stops getting messages from the rabbitmq queue and when i try to restart the service I get the following error:
{"inflight_count"=>45111, "stalling_thread_info"=>{["LogStash::Filters::Ruby", {"code"=>"event.set('process_time',Time.now.utc.strftime('%FT%T.%LZ'))"}]=>[{"thread_id"=>58, "name"=>"[main]>worker0", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/manticore-0.6.1-java/lib/manticore/response.rb:50:in
call'"}, {"thread_id"=>59, "name"=>"[main]>worker1", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/output_delegator.rb:128:in
pop'"}, {"thread_id"=>60, "name"=>"[main]>worker2", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/output_delegator.rb:128:inpop'"}, {"thread_id"=>61, "name"=>"[main]>worker3", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/output_delegator.rb:128:in
pop'"}, {"thread_id"=>63, "name"=>"[main]>worker5", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/output_delegator.rb:128:inpop'"}, {"thread_id"=>64, "name"=>"[main]>worker6", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/output_delegator.rb:128:in
pop'"}, {"thread_id"=>65, "name"=>"[main]>worker7", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/output_delegator.rb:128:inpop'"}, {"thread_id"=>66, "name"=>"[main]>worker8", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/output_delegator.rb:128:in
pop'"}, {"thread_id"=>68, "name"=>"[main]>worker10", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/manticore-0.6.1-java/lib/manticore/response.rb:50:incall'"}, {"thread_id"=>69, "name"=>"[main]>worker11", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/output_delegator.rb:128:in
pop'"}, {"thread_id"=>70, "name"=>"[main]>worker12", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/output_delegator.rb:128:inpop'"}, {"thread_id"=>71, "name"=>"[main]>worker13", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/output_delegator.rb:128:in
pop'"}, {"thread_id"=>72, "name"=>"[main]>worker14", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/manticore-0.6.1-java/lib/manticore/response.rb:50:incall'"}, {"thread_id"=>73, "name"=>"[main]>worker15", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/output_delegator.rb:128:in
pop'"}]}} {:level=>:warn}
I can't find information about ruby filter and if it's thread-safe.
Also is there any other way to timestamp the message when it reaches certain point on the pipeline?