Aggregation error - nil can't be coerced into Fixnum

I think you need to take elasticsearch and kibana out of the equation and just run with

output { stdout { codec => rubydebug } }

See if the events that logstash generates are what you expect. Then adjust either the events or your expectations. Start off with a really simple configuration like the one I posted earlier in the thread (swapping out the json filter for your csv filter).

I did it and it is not working. I am not getting the final event.
Just Need to mention that I am running version 6.6.0 on windows machine.

My Input file: (foo.json)

{ "PROBLEM" : 100 }
{ "PROBLEM" : 101 }
{ "PROBLEM" : 102 }
{ "PROBLEM" : 103 }
{ "PROBLEM" : 104 }
{ "PROBLEM" : 105 }
{ "PROBLEM" : 106 }

Input:

input {
       file {
             path => "C:/kpi/foo.json"
             start_position => "beginning"
             sincedb_path => "NUL"
       }
}

My filter:

filter {
     if [path] == "C:/kpi/foo.json"
     {
         json { source => "message" }
         aggregate {
                task_id => "%{PROBLEM}"
                code => "
                     map['FooJustTesting'] ||= 0; map['FooJustTesting'] += 1
                "
                push_map_as_event_on_timeout => true
                inactivity_timeout => 2
         }
     }
}

My output:

output {
     if [path] == "C:/kpi/foo.json"
     {
         stdout{}
     }
}

The stdout results:

   {
          "host" => "SHARONSA03",
       "message" => "{ \"PROBLEM\" : 100 }\r",
    "@timestamp" => 2019-02-21T14:02:00.457Z,
          "path" => "C:/kpi/foo.json",
      "@version" => "1",
       "PROBLEM" => 100
}
{
          "host" => "SHARONSA03",
       "message" => "{ \"PROBLEM\" : 104 }\r",
    "@timestamp" => 2019-02-21T14:02:00.522Z,
          "path" => "C:/kpi/foo.json",
      "@version" => "1",
       "PROBLEM" => 104
}
{
          "host" => "SHARONSA03",
       "message" => "{ \"PROBLEM\" : 102 }\r",
    "@timestamp" => 2019-02-21T14:02:00.521Z,
          "path" => "C:/kpi/foo.json",
      "@version" => "1",
       "PROBLEM" => 102
}
{
          "host" => "SHARONSA03",
       "message" => "{ \"PROBLEM\" : 103 }\r",
    "@timestamp" => 2019-02-21T14:02:00.522Z,
          "path" => "C:/kpi/foo.json",
      "@version" => "1",
       "PROBLEM" => 103
}
{
          "host" => "SHARONSA03",
       "message" => "{ \"PROBLEM\" : 101 }\r",
    "@timestamp" => 2019-02-21T14:02:00.518Z,
          "path" => "C:/kpi/foo.json",
      "@version" => "1",
       "PROBLEM" => 101
}
{
          "host" => "SHARONSA03",
       "message" => "{ \"PROBLEM\" : 105 }\r",
    "@timestamp" => 2019-02-21T14:02:00.523Z,
          "path" => "C:/kpi/foo.json",
      "@version" => "1",
       "PROBLEM" => 105
}
{
          "host" => "SHARONSA03",
       "message" => "{ \"PROBLEM\" : 106 }\r",
    "@timestamp" => 2019-02-21T14:02:00.523Z,
          "path" => "C:/kpi/foo.json",
      "@version" => "1",
       "PROBLEM" => 106
}

Thanks
Sharon.

You have "--pipeline.workers 1", right?

Don't know. How to check?

I have default

# This defaults to the number of the host's CPU cores.
#
# pipeline.workers: 2

It is commented in the logstash.yml

Uncomment that and set it to 1.

{
    "@timestamp" => 2019-02-21T14:49:36.496Z,
          "host" => "SHARONSA03",
       "PROBLEM" => 100,
       "message" => "{ \"PROBLEM\" : 100 }\r",
      "@version" => "1",
          "path" => "C:/kpi/foo.json"
}
{
    "@timestamp" => 2019-02-21T14:49:36.558Z,
          "host" => "SHARONSA03",
       "PROBLEM" => 101,
       "message" => "{ \"PROBLEM\" : 101 }\r",
      "@version" => "1",
            "path" => "C:/kpi/foo.json"
}
{
    "@timestamp" => 2019-02-21T14:49:36.558Z,
          "host" => "SHARONSA03",
       "PROBLEM" => 102,
       "message" => "{ \"PROBLEM\" : 102 }\r",
      "@version" => "1",
            "path" => "C:/kpi/foo.json"
}
{
    "@timestamp" => 2019-02-21T14:49:36.574Z,
          "host" => "SHARONSA03",
       "PROBLEM" => 103,
       "message" => "{ \"PROBLEM\" : 103 }\r",
      "@version" => "1",
           "path" => "C:/kpi/foo.json"
}
{
    "@timestamp" => 2019-02-21T14:49:36.574Z,
          "host" => "SHARONSA03",
       "PROBLEM" => 104,
       "message" => "{ \"PROBLEM\" : 104 }\r",
      "@version" => "1",
           "path" => "C:/kpi/foo.json"
}
{
    "@timestamp" => 2019-02-21T14:49:36.574Z,
          "host" => "SHARONSA03",
       "PROBLEM" => 105,
       "message" => "{ \"PROBLEM\" : 105 }\r",
      "@version" => "1",
               "path" => "C:/kpi/foo.json"
}
{
    "@timestamp" => 2019-02-21T14:49:36.574Z,
          "host" => "SHARONSA03",
       "PROBLEM" => 106,
       "message" => "{ \"PROBLEM\" : 106 }\r",
      "@version" => "1",
            "path" => "C:/kpi/foo.json"
}

The aggregate event will not have a path field because you did not add it to the map. Remove this conditional.

Good... We are getting close to a solution.....

Now I see that in the end of the stdout:

{
    "FooJustTesting" => 1,
        "@timestamp" => 2019-02-21T15:04:44.247Z,
          "@version" => "1"
}
{
    "FooJustTesting" => 1,
        "@timestamp" => 2019-02-21T15:04:44.262Z,
          "@version" => "1"
}
{
    "FooJustTesting" => 1,
        "@timestamp" => 2019-02-21T15:04:44.262Z,
          "@version" => "1"
}
{
    "FooJustTesting" => 1,
        "@timestamp" => 2019-02-21T15:04:44.262Z,
          "@version" => "1"
}
{
    "FooJustTesting" => 1,
        "@timestamp" => 2019-02-21T15:04:44.262Z,
          "@version" => "1"
}
{
    "FooJustTesting" => 1,
        "@timestamp" => 2019-02-21T15:04:44.262Z,
          "@version" => "1"
}
{
    "FooJustTesting" => 1,
        "@timestamp" => 2019-02-21T15:04:44.262Z,
          "@version" => "1"
}

Shouldn't I get FooJustTesting" => 6 ?

map['FooJustTesting'] ||= 0; map['FooJustTesting'] += 1

Who to add a path to the map?

The path is the way I differ between the various inputs.

No. There are 7 different map entries, one for each value of PROBLEM. Add

timeout_task_id_field => "PROBLEM"

and you will see that. If you want the path to be on the event then include

map['path'] = event.get('path')

to the code in the aggregate filter.

The "path" is working !

The FooJustTesting is still 1. My purpose it to aggregate the values in the PROBLEM field.
Let's assume that the field contains duration. I am trying to aggregate the duration, to have total duration of all the events.

{
    "FooJustTesting" => 1,
          "@version" => "1",
        "@timestamp" => 2019-02-21T15:27:22.405Z,
           "PROBLEM" => "100",
              "path" => "C:/kpi/foo.json"
}
{
    "FooJustTesting" => 1,
          "@version" => "1",
        "@timestamp" => 2019-02-21T15:27:22.441Z,
           "PROBLEM" => "101",
         "path" => "C:/kpi/foo.json"
}
{
    "FooJustTesting" => 1,
          "@version" => "1",
        "@timestamp" => 2019-02-21T15:27:22.441Z,
           "PROBLEM" => "102",
              "path" => "C:/kpi/foo.json"
}
{
    "FooJustTesting" => 1,
          "@version" => "1",
        "@timestamp" => 2019-02-21T15:27:22.441Z,
           "PROBLEM" => "103",
            "path" => "C:/kpi/foo.json"
}
{
    "FooJustTesting" => 1,
          "@version" => "1",
        "@timestamp" => 2019-02-21T15:27:22.442Z,
           "PROBLEM" => "104",
             "path" => "C:/kpi/foo.json"
}
{
    "FooJustTesting" => 1,
          "@version" => "1",
        "@timestamp" => 2019-02-21T15:27:22.442Z,
           "PROBLEM" => "105",
            "path" => "C:/kpi/foo.json"
}
{
    "FooJustTesting" => 1,
          "@version" => "1",
        "@timestamp" => 2019-02-21T15:27:22.442Z,
           "PROBLEM" => "106",
           "path" => "C:/kpi/foo.json"
}

I think I understand
Let me try

This is now my aggregation:

filter {
     if [path] == "C:/Users/sharonsa/Work/GSS/VIVO/kpi/foo.json"
     {
         json { source => "message" }
         aggregate {
                task_id => "%{PROBLEM}"
                code => "
                     map['FooJustTesting'] ||= 0; map['FooJustTesting'] += event.get('PROBLEM')
                     map['path'] = event.get('path')
                     event.set('TotalDuration', map['FooJustTesting'])
                "
                timeout_task_id_field => "PROBLEM"
                push_map_as_event_on_timeout => true
                inactivity_timeout => 2
         }
     }
}

I expected to TotalDuration to include the total values: (100+101+....etc)

The current output results is:

{
          "PROBLEM" => 100,
    "TotalDuration" => 100,
         "@version" => "1",
       "@timestamp" => 2019-02-21T15:43:56.254Z,
             "path" => "C:/Users/sharonsa/Work/GSS/VIVO/kpi/foo.json",
          "message" => "{ \"PROBLEM\" : 100 }\r",
             "host" => "SHARONSA03"
}
{
          "PROBLEM" => 101,
    "TotalDuration" => 101,
         "@version" => "1",
       "@timestamp" => 2019-02-21T15:43:56.337Z,
             "path" => "C:/Users/sharonsa/Work/GSS/VIVO/kpi/foo.json",
          "message" => "{ \"PROBLEM\" : 101 }\r",
             "host" => "SHARONSA03"
}
{
          "PROBLEM" => 102,
    "TotalDuration" => 102,
         "@version" => "1",
       "@timestamp" => 2019-02-21T15:43:56.337Z,
             "path" => "C:/Users/sharonsa/Work/GSS/VIVO/kpi/foo.json",
          "message" => "{ \"PROBLEM\" : 102 }\r",
             "host" => "SHARONSA03"
}
etc.....
{
    "FooJustTesting" => 100,
        "@timestamp" => 2019-02-21T15:44:03.831Z,
           "PROBLEM" => "100",
              "path" => "C:/Users/sharonsa/Work/GSS/VIVO/kpi/foo.json",
          "@version" => "1"
}
{
    "FooJustTesting" => 101,
        "@timestamp" => 2019-02-21T15:44:03.862Z,
           "PROBLEM" => "101",
              "path" => "C:/Users/sharonsa/Work/GSS/VIVO/kpi/foo.json",
          "@version" => "1"
}
{
    "FooJustTesting" => 102,
        "@timestamp" => 2019-02-21T15:44:03.862Z,
           "PROBLEM" => "102",
              "path" => "C:/Users/sharonsa/Work/GSS/VIVO/kpi/foo.json",
          "@version" => "1"
}

etc....

Where is my current mistake?

An aggregate filter will aggregate data for a particular task_id. In your example, every record has a different task_id value.

Going back to your original data set. If you want to aggregate data from different records for the same incident ticket, then use the incident id as the task id. If you want to aggregate across all records, use a constant as the task_id. For example,

    mutate { add_field => { "[@metadata][task_id]" => "1" } }
    aggregate {
        task_id => "%{[@metadata][task_id]}"
        code => " map['JustTesting'] ||= 0; map['JustTesting'] += 1 "
        push_map_as_event_on_timeout => true
        timeout_code => 'event.set("[@metadata][wanted]", true)'
        inactivity_timeout => 2
    }
    if ! [@metadata][wanted] { drop {} }

will get you

{
"JustTesting" => 7,
   "@version" => "1",
 "@timestamp" => 2019-02-21T16:48:04.015Z
}
1 Like

Working!!!!

{
        "@timestamp" => 2019-02-21T17:11:54.205Z,
    "FooJustTesting" => 721,
              "path" => "C:/kpi/foo.json",
          "@version" => "1"
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.