Aggregation EPS and Average Elapsed Time

I need to create an aggregation to provide a count of EPS and Average elapsed time for an operation every second. I have the following but with my test data, I don't get any output.

Below is my filter and I am running within 1 worker, I am using the time stamp and operation to generate the output value for every operation every second. I would expect to see some output but nothing. Any ideas what I'm missing?

grok {
match => { "timestamp" => "(T%{HOUR:aggh}:%{MINUTE:aggm}:%{MINUTE:aggs}.)" }
}
mutate {
add_field => { "elapseindex" => "%{aggh}%{aggm}%{aggs}" }
remove_field => [ "aggh", "aggm", "aggs" ]
}
mutate {
convert => [ "elapsetime", "integer"]
convert => [ "elapseindex", "integer"]
}
if [operation] {
aggregate {
timeout_timestamp_field => "timestamp"
task_id => "%{elapseindex}_%{operation}"
code => "
map['avg'] ||= 0;
map['avg'] += event.get('elapsedtime');
map['eps_count'] ||= 0;
map['eps_count'] += 1;
"
push_map_as_event_on_timeout => true
timeout => 5
timeout_code => "
event.set('elapsed_avg', (map['avg'] / map['eps_count']));
event.set('events_per_second', 'eps_count');
event.set('agg_operation', event.get('operation'));
event.set('Aggregation', true);
"
}
}
if !['Aggregation'] {
drop {}
}

What does the input look like?

{
"elapsedtime" => 0,
"status" => "SUCCESSFUL",
"elapsedtimeunits" => "MILLISECONDS",
"elapseindex" => 10101,
"@timestamp" => 2019-02-19T14:11:32.120Z,
"operation" => "SEARCH",
"timestamp" => "2019-01-28T01:01:01.380Z"
}
{
"elapsedtime" => 0,
"status" => "SUCCESSFUL",
"elapsedtimeunits" => "MILLISECONDS",
"elapseindex" => 10101,
"@timestamp" => 2019-02-19T14:11:32.120Z,
"operation" => "SEARCH",
"timestamp" => "2019-01-28T01:01:01.381Z"
}

I added a date filter to timeastamp and I'm getting farther but this is what I get from logstash:

tap>, :timeout_code=>" \n event.set('elapsed_avg', (map['avg'] / map['eps_count']));\n event.set('events_per_second', 'eps_count');\n event.set('agg_operation', event.get('operation'));\n event.set('Aggregation', true);\n ", :timeout_event_data=>{"avg"=>0, "@timestamp"=>2019-02-19T14:49:31.454Z, "@version"=>"1", "eps_count"=>2}}

[ERROR] 2019-02-19 09:49:31.462 [LogStash::Runner] Logstash - org.jruby.exceptions.ThreadKill

[ERROR] 2019-02-19 09:49:31.464 [[main]>worker0] aggregate - Aggregate exception occurred {:error=>#<NameError: undefined local variable or method `map' for #<LogStash::Filters::Aggregate:0x4a31d81b>

Did you mean? map_action

map_action=

That is the output, what does the input look like?

You need to remove the quotes in the test of Aggregation

if ![Aggregation] {

When the timeout_code executes map no longer exists, but whatever was in map for this task has been pre-populated

event.set('elapsed_avg', (event.get('avg') / event.get('eps_count')));

Yeah, I think I just figured that out... Thanks!

aggregate {
#timeout_timestamp_field => "timestamp"
task_id => "%{elapseindex}_%{operation}"
code => "
map['avg'] ||= 0;
map['avg'] += event.get('elapsedtime');
map['eps_count'] ||= 0;
map['eps_count'] += 1;
map['operation'] = event.get('operation');
"
push_map_as_event_on_timeout => true
timeout => 5
timeout_code => "
event.set('elapsed_avg', (event.get('avg') / event.get('eps_count')));
event.set('Aggregation', true);
"
}

How does one reference the results of the aggregation in the output?
It doesn't appear to be %{message}. I am trying to add them to a syslog output.

Other than @version and @timestamp the events will only have the fields you added to them...

{
       "@timestamp" => 2019-02-19T17:44:10.705Z,
      "elapsed_avg" => 0,
              "avg" => 0,
        "eps_count" => 2,
"events_per_second" => "eps_count",
      "Aggregation" => true,
         "@version" => "1",
    "agg_operation" => nil
}

Thanks, That is what I figured... This is to get data into splunk without impacting the licensing cost too bad... lol...