Cannot get the aggregate plugin to work

I am trying to compute response times and therefore do a line2 - line1 type operation.

As I couldn't get this to work with real data, I wrote a tiny python program which logs in this format:

The idea now is compare what Logstash returns from the computation of agg_total+=RandomNumber, with the value of Total.

This is the Exception I get when I try to start Logstash (1.5.2 or 1.5.3 )

Exception in filterworker {"exception"=>#<NoMethodError: undefined method `+' for nil:NilClass>, "backtrace"=>["(aggregate filter code):1:in `register'", "org/jruby/RubyProc.java:271:in `call'", "/Users/AnoBan/Elastic/logstash-1.5.2/vendor/bundle/jruby/1.9/gems/logstash-filter-aggregate-0.1.3/lib/logstash/filters/aggregate.rb:204:in `filter'", "org/jruby/ext/thread/Mutex.java:149:in `synchronize'", "/Users/AnoBan/Elastic/logstash-1.5.2/vendor/bundle/jruby/1.9/gems/logstash-filter-aggregate-0.1.3/lib/logstash/filters/aggregate.rb:191:in `filter'", "/Users/AnoBan/Elastic/logstash-1.5.2/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.2.2-java/lib/logstash/filters/base.rb:163:in `multi_filter'", "org/jruby/RubyArray.java:1613:in `each'", "/Users/AnoBan/Elastic/logstash-1.5.2/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.2.2-java/lib/logstash/filters/base.rb:160:in `multi_filter'", "(eval):428:in `cond_func_17'", "org/jruby/RubyArray.java:1613:in `each'", "(eval):425:in `cond_func_17'", "(eval):443:in `cond_func_16'", "org/jruby/RubyArray.java:1613:in `each'", "(eval):439:in `cond_func_16'", "(eval):474:in `cond_func_14'", "org/jruby/RubyArray.java:1613:in `each'", "(eval):469:in `cond_func_14'", "(eval):207:in `filter_func'", "/Users/AnoBan/Elastic/logstash-1.5.2/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.2.2-java/lib/logstash/pipeline.rb:218:in `filterworker'", "/Users/AnoBan/Elastic/logstash-1.5.2/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.2.2-java/lib/logstash/pipeline.rb:156:in `start_filters'"], :level=>:error}

This is my log stash Config:


input {

# Fake file for the Aggregate
  file {
    path => "/Users/AnoBan/Elastic/aggregator/*test"
    type => "aggregate"
  }
}

filter {
#    if [type] == "system" {
#        grok { 
#            match => [ "message" , "%{OSX}" ]
#        }
#    }
    if [type] == "aggregate" {
        grok {
            match => [ "message" , "%{GREEDYDATA},%{NOTSPACE:counter:int},%{NOTSPACE:id},%{NOTSPACE:total:int},%{NOTSPACE:random:int},%{NOTSPACE:key}" ]
        }
        if [key] == "START" {
            mutate { add_tag => [ "keyIsStart" ] }
            aggregate {
                task_id => "%{id}"
                code => "map['total_value'] = 0"
                map_action => "create"
                add_tag => [ "aggregateStart" ]
            }
        }
        if [key] == "MIDDLE" {
            mutate { add_tag => [ "keyIsMiddle" ] }
            if [random] {
                aggregate {
                    task_id => "%{id}"
                    code => "map['total_value'] += event['random']"
                    add_tag => [ "aggregateMiddle" ]
                }
            }
        }
        if [key] == "END" {
            mutate { add_tag => [ "keyIsEnd" ] }
            aggregate {
                task_id => "%{id}"
                code => "event['agg_total'] = map['total_value']"
                end_of_task => true
                add_tag => [ "aggregateEnd" ]
            }
        }
    }
}

output {
    elasticsearch {
        host => "127.0.0.1"
        protocol => "transport"
    }
    stdout { codec => "rubydebug" }
}

Adding

On middle and end blocks fixed the issue.

I was basically trying to run code on non existing maps.

1 Like