Hi,
I got exceptions while trying to use throttle in Logstash, the exception is
Exception in pipelineworker, the pipeline stopped processing new events, please
check your filter configuration and restart Logstash. {:pipeline_id=>"kafka",
"exception"=>"", "backtrace"=>["org.jruby.ir.targets.InvokeSite.pollAndGetClass(
InvokeSite.java:444)", "org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:125)", "home.kafka.kafka_tools.logstash_minus_6_dot_2_dot_4.vendor.bundle.jruby.
$2_dot_3_dot_0.gems.logstash_minus_filter_minus_throttle_minus_4_dot_0_dot_
4.lib.logstash.filters.throttle.RUBY$method$filter$0(/home/kafka/kafka_tools/
logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/logstash-filter-throttle-4.0.4/lib/logstash/filters/throttle.rb:227)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMe
thod.java:103)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(
MixedModeIRM
ethod.java:163)", "org.jruby.ir.targets.InvokeSite.fail(InvokeSite.java:187)", "home.kafka.kafka_tools.logstash_minus_6_dot_2_dot_4.logstash_minus_core
.lib.logstash.filters.base.RUBY$method$do_filter$0(/home/kafka/kafka_too
ls/logstash-6.2.4/logstash-core/lib/logstash/filters/base.rb:145)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledI
RMethod.java:103)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(Mixed
ModeIRMethod.java:163)", "org.jruby.ir.targets.InvokeSite.fail(InvokeSite.ja
va:187)", "home.kafka.kafka_tools.logstash_minus_6_dot_2_dot_4.logstash_minus_c
ore.lib.logstash.filters.base.RUBY$block$multi_filter$1(/home/kafka/kafka
_tools/logstash-6.2.4/logstash-core/lib/logstash/filters/base.rb:164)", "org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBo
dy.java:156)", "org.jruby.runtime.BlockBody.yield(BlockBody.java:114)", "org.jruby.runtime.Block.yield(Block.java:165)", "org.jruby.RubyArray.each
(RubyArray.java:1734)", "home.kafka.kafka_tools.logstash_minus_6_dot_2_dot_4.logstash_minus_
core.lib.logstash.filters.base.RUBY$method$multi_filter$0(/home/kafka/ka
fka_tools/logstash-6.2.4/logstash-core/lib/logstash/filters/base.rb:161)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRM
ethod.java:103)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedMo
deIRMethod.java:163)", "org.jruby.ir.targets.InvokeSite.fail(InvokeSite.java:187)", "home.kafka.kafka_tools.logstash_minus_6_dot_2_dot_4.logstash_minus_c
ore.lib.logstash.filter_delegator.RUBY$method$multi_filter$0(/home/kafka/kafka_tools/logstash-6.2.4/logstash-core/lib/logstash/filter_delegator.rb:47)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(Compile
dIRMethod.java:103)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(Mixed
ModeIRMethod.java:163)", "org.jruby.internal.runtime.methods.Dynam
icMethod.call(DynamicM
ethod.java:200)", "org.jruby.runtime.callsite.CachingCallSite.call(Cachin
gCallSite.java:161)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:314)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:73)", "org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:132)", "org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:148)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:73)", "org.jruby.runtime.Block.call(Block.java:124)", "org.jruby.RubyProc.call(RubyProc.java:289)", "org.jruby.internal.runtime.methods.ProcMethod.call(ProcMethod.java:63)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:204)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:200)", "home.kafka.kafka_tools.logstash_minus_6_dot_2_dot_4.logstash_minus_core.lib.logstash.pipeline.RUBY$method$filter_batch$0(/home/kafka/kafka_tools/logstash-6.2.4/logstash-core/lib/logstash/pipeline.rb:445)", "home.kafka.kafka_tools.logstash_minus_6_dot_2_dot_4.logstash_minus_core.lib.logstash.pipeline.RUBY$method$worker_loop$0(/home/kafka/kafka_tools/logstash-6.2.4/logstash-core/lib/logstash/pipeline.rb:424)", "home.kafka.kafka_tools.logstash_minus_6_dot_2_dot_4.logstash_minus_core.lib.logstash.pipeline.RUBY$block$start_workers$2(/home/kafka/kafka_tools/logstash-6.2.4/logstash-core/lib/logstash/pipeline.rb:386)", "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:145)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:71)", "org.jruby.runtime.Block.call(Block.java:124)", "org.jruby.RubyProc.call(RubyProc.java:289)", "org.jruby.RubyProc.call(RubyProc.java:246)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:104)", "java.lang.Thread.run(Thread.java:748)"], :thread=>"#<Thread:0x43191d9f sleep>"}
My Logstash configuration is:
input {
beats {
port => "5044"
}
}
filter {
grok {
remove_field => ["message", "beat", "prospector", "@version", "offset"]
match => {
"message" => "\[(?<log_time_str>%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND},[0-9]{3})\] %{LOGLEVEL:log_level} (?<content>\[?(?<component>(\[.*\])|(.*?[0-9])|(.*?))\]?(:|,)? ?%{GREEDYDATA})"
}
}
grok {
match => {
"source" => ".+/(?<log_type>.+\.log)"
}
}
mutate {
rename => { "@timestamp" => "filebeat_time" }
}
date {
timezone => "America/Los_Angeles"
match => ["log_time_str", "yyyy-MM-dd HH:mm:ss,SSS"]
target => "log_time"
remove_field => ["log_time_str"]
}
ruby {
code => 'require "date"
current_time = DateTime.now
t = current_time.strftime("%Y-%m-%d %H:%M:%S.%L")
event.set("logstash_time", t)'
}
if ([clustername] == "UpgradeTestV11") {if ([log_level] == "ALL" or [log_level] == "TRACE" or [log_level] == "DEBUG") {drop{}}}
else if ([clustername] == "kafkaloganalyzer") {if ([log_level] == "ALL" or [log_level] == "TRACE" or [log_level] == "DEBUG") {drop{}}}
else if ([clustername] == "GCPPerfCluster") {if ([log_level] == "ALL" or [log_level] == "TRACE") {drop{}}}
else if ([clustername] == "IntegrationCluster2") {if ([log_level] == "ALL" or [log_level] == "TRACE" or [log_level] == "DEBUG") {drop{}}}
else if ([clustername] == "kafkadev3") {if ([log_level] == "ALL" or [log_level] == "TRACE" or [log_level] == "DEBUG") {drop{}}}
else if ([clustername] == "GCPIntegrationCluster") {if ([log_level] == "ALL" or [log_level] == "TRACE") {drop{}}}
else {
if [log_level] == "TRACE" or [log_level] == "DEBUG"{
drop {
}
}
}
throttle {
before_count => 3
after_count => 5
period => 3600
max_age => 7200
key => "%{content}"
add_tag => "throttled"
}
if "throttled" in [tags] {
drop { }
}
}
output {
# stdout { codec => rubydebug }
elasticsearch {
index => "kafka_log_qa"
hosts => ["10.176.15.1:9200"]
}
}
Thanks!