Exception in throttle

Hi,

I got exceptions while trying to use throttle in Logstash, the exception is

Exception in pipelineworker, the pipeline stopped processing new events, please 
check your filter configuration and restart Logstash. {:pipeline_id=>"kafka", 
"exception"=>"", "backtrace"=>["org.jruby.ir.targets.InvokeSite.pollAndGetClass(
InvokeSite.java:444)", "org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:125)", "home.kafka.kafka_tools.logstash_minus_6_dot_2_dot_4.vendor.bundle.jruby.
$2_dot_3_dot_0.gems.logstash_minus_filter_minus_throttle_minus_4_dot_0_dot_
4.lib.logstash.filters.throttle.RUBY$method$filter$0(/home/kafka/kafka_tools/
logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/logstash-filter-throttle-4.0.4/lib/logstash/filters/throttle.rb:227)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMe
thod.java:103)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(
MixedModeIRM
ethod.java:163)", "org.jruby.ir.targets.InvokeSite.fail(InvokeSite.java:187)", "home.kafka.kafka_tools.logstash_minus_6_dot_2_dot_4.logstash_minus_core
.lib.logstash.filters.base.RUBY$method$do_filter$0(/home/kafka/kafka_too
ls/logstash-6.2.4/logstash-core/lib/logstash/filters/base.rb:145)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledI
RMethod.java:103)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(Mixed
ModeIRMethod.java:163)", "org.jruby.ir.targets.InvokeSite.fail(InvokeSite.ja
va:187)", "home.kafka.kafka_tools.logstash_minus_6_dot_2_dot_4.logstash_minus_c
ore.lib.logstash.filters.base.RUBY$block$multi_filter$1(/home/kafka/kafka
_tools/logstash-6.2.4/logstash-core/lib/logstash/filters/base.rb:164)", "org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBo
dy.java:156)", "org.jruby.runtime.BlockBody.yield(BlockBody.java:114)", "org.jruby.runtime.Block.yield(Block.java:165)", "org.jruby.RubyArray.each
(RubyArray.java:1734)", "home.kafka.kafka_tools.logstash_minus_6_dot_2_dot_4.logstash_minus_
core.lib.logstash.filters.base.RUBY$method$multi_filter$0(/home/kafka/ka
fka_tools/logstash-6.2.4/logstash-core/lib/logstash/filters/base.rb:161)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRM
ethod.java:103)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedMo
deIRMethod.java:163)", "org.jruby.ir.targets.InvokeSite.fail(InvokeSite.java:187)", "home.kafka.kafka_tools.logstash_minus_6_dot_2_dot_4.logstash_minus_c
ore.lib.logstash.filter_delegator.RUBY$method$multi_filter$0(/home/kafka/kafka_tools/logstash-6.2.4/logstash-core/lib/logstash/filter_delegator.rb:47)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(Compile
dIRMethod.java:103)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(Mixed
ModeIRMethod.java:163)", "org.jruby.internal.runtime.methods.Dynam
icMethod.call(DynamicM
ethod.java:200)", "org.jruby.runtime.callsite.CachingCallSite.call(Cachin
gCallSite.java:161)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:314)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:73)", "org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:132)", "org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:148)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:73)", "org.jruby.runtime.Block.call(Block.java:124)", "org.jruby.RubyProc.call(RubyProc.java:289)", "org.jruby.internal.runtime.methods.ProcMethod.call(ProcMethod.java:63)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:204)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:200)", "home.kafka.kafka_tools.logstash_minus_6_dot_2_dot_4.logstash_minus_core.lib.logstash.pipeline.RUBY$method$filter_batch$0(/home/kafka/kafka_tools/logstash-6.2.4/logstash-core/lib/logstash/pipeline.rb:445)", "home.kafka.kafka_tools.logstash_minus_6_dot_2_dot_4.logstash_minus_core.lib.logstash.pipeline.RUBY$method$worker_loop$0(/home/kafka/kafka_tools/logstash-6.2.4/logstash-core/lib/logstash/pipeline.rb:424)", "home.kafka.kafka_tools.logstash_minus_6_dot_2_dot_4.logstash_minus_core.lib.logstash.pipeline.RUBY$block$start_workers$2(/home/kafka/kafka_tools/logstash-6.2.4/logstash-core/lib/logstash/pipeline.rb:386)", "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:145)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:71)", "org.jruby.runtime.Block.call(Block.java:124)", "org.jruby.RubyProc.call(RubyProc.java:289)", "org.jruby.RubyProc.call(RubyProc.java:246)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:104)", "java.lang.Thread.run(Thread.java:748)"], :thread=>"#<Thread:0x43191d9f sleep>"}

My Logstash configuration is:

input {
	beats {
	        port => "5044"
	    }
}

filter {
    grok {
        remove_field => ["message", "beat", "prospector", "@version", "offset"]
        match => {
          "message" => "\[(?<log_time_str>%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND},[0-9]{3})\] %{LOGLEVEL:log_level} (?<content>\[?(?<component>(\[.*\])|(.*?[0-9])|(.*?))\]?(:|,)? ?%{GREEDYDATA})"
        }
    }

    grok {
        match => {
          "source" => ".+/(?<log_type>.+\.log)"
        }
    }

    mutate {
        rename => { "@timestamp" => "filebeat_time" }
    }

    date {
        timezone => "America/Los_Angeles"
        match => ["log_time_str", "yyyy-MM-dd HH:mm:ss,SSS"]
        target => "log_time"
        remove_field => ["log_time_str"]
    }




    ruby {
        code => 'require "date"
                 current_time = DateTime.now
                 t = current_time.strftime("%Y-%m-%d %H:%M:%S.%L")
                 event.set("logstash_time", t)'
    }



if ([clustername] == "UpgradeTestV11") {if ([log_level] == "ALL" or [log_level] == "TRACE" or [log_level] == "DEBUG") {drop{}}}
else if ([clustername] == "kafkaloganalyzer") {if ([log_level] == "ALL" or [log_level] == "TRACE" or [log_level] == "DEBUG") {drop{}}}
else if ([clustername] == "GCPPerfCluster") {if ([log_level] == "ALL" or [log_level] == "TRACE") {drop{}}}
else if ([clustername] == "IntegrationCluster2") {if ([log_level] == "ALL" or [log_level] == "TRACE" or [log_level] == "DEBUG") {drop{}}}
else if ([clustername] == "kafkadev3") {if ([log_level] == "ALL" or [log_level] == "TRACE" or [log_level] == "DEBUG") {drop{}}}
else if ([clustername] == "GCPIntegrationCluster") {if ([log_level] == "ALL" or [log_level] == "TRACE") {drop{}}}

    else {
      if [log_level] == "TRACE" or [log_level] == "DEBUG"{
        drop {
        }
      }
    }

    throttle {
        before_count => 3
        after_count => 5
        period => 3600
        max_age => 7200
        key => "%{content}"
        add_tag => "throttled"
    }

    if "throttled" in [tags] {
        drop { }
    }

}

output {
#	stdout { codec => rubydebug }
    elasticsearch {
        index => "kafka_log_qa"
            hosts => ["10.176.15.1:9200"]
        }
}

Thanks!

Got it. The field timestamp is renamed.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.