Using aggregate filter for multi task_id

hi all I am using jmx plugin to ship WebLogic MBean into ELK. for this purpose, following script is using in logstash.

input {
jmx {

  path => "C:\logstash\jmxconf"
  polling_frequency => 5
  type => "jmx"
  nb_thread => 4
}

}

filter {


mutate {
            split => ["metric_path", "."]
			add_field => { "jmx_path" => "%{metric_path[1]}" }
			add_field => { "jmx_att1" => "%{metric_path[2]}" }
			add_field => { "jmx_att2" => "%{metric_path[3]}" }
			add_field => { "jmx_alias" => "%{metric_path[0]}" }

}
if "Memory" in [jmx_path]{
mutate {
            add_field => { "jmx_att3" => "%{jmx_att1}_%{jmx_att2}" }

}
}
else {
mutate {
            add_field => { "jmx_att3" => "%{jmx_att1}" }

}
}

 aggregate {
    task_id => "%{jmx_alias}"
    code => "
 map['jmx_alias'] = event.get('jmx_alias')
         map['info'] ||= []
         map['info'] << {event.get('jmx_att3') => event.get('metric_value_number')}
		 event.cancel()
		 "
	     push_previous_map_as_event => true
         timeout => 4
		 }

}
output {
  elasticsearch { 
    hosts => ["localhost:9200"]
    index => "test_%{+YYYY.MM.dd}"
 }
  stdout { codec => rubydebug }
}

and in the "C:\logstash\jmxconf" directory, following scripts (aliases) exists:

1-

     {
  "host" : "1.0.0.1",
  "port" : 8888,
  "alias" : "srv1",
  "queries" : [
  {
    "object_name" : "java.lang:type=Memory",
	"attributes" : [ "HeapMemoryUsage", "NonHeapMemoryUsage" ],
    "object_alias" : "Memory"
  },{
    "object_name" : "java.lang:type=Threading",
	"attributes" : [ "ThreadCount", "TotalStartedThreadCount","DaemonThreadCount","PeakThreadCount" ],
    "object_alias" : "Threading"
  },{
    "object_name" : "com.bea:ServerRuntime=MS1,Name=ThreadPoolRuntime,Type=ThreadPoolRuntime",
    "attributes" : [ "StuckThreadCount", "Throughput", "QueueLength", "HoggingThreadCount", "StandbyThreadCount" ],
    "object_alias" : "ThreadPoolRuntime"
  },{
    "object_name" : "com.bea:ServerRuntime=MS1,Name=MS1,Type=JVMRuntime",
    "attributes" : [ "HeapFreePercent" ],
    "object_alias" : "Heap"
  }]
}

2-

{
  "host" : "1.0.0.2",
  "port" : 8888,
  "alias" : "srv2",
  "queries" : [
  {
    "object_name" : "java.lang:type=Memory",
	"attributes" : [ "HeapMemoryUsage", "NonHeapMemoryUsage" ],
    "object_alias" : "Memory"
  },{
    "object_name" : "java.lang:type=Threading",
	"attributes" : [ "ThreadCount", "TotalStartedThreadCount","DaemonThreadCount","PeakThreadCount" ],
    "object_alias" : "Threading"
  },{
    "object_name" : "com.bea:ServerRuntime=MS1,Name=ThreadPoolRuntime,Type=ThreadPoolRuntime",
    "attributes" : [ "StuckThreadCount", "Throughput", "QueueLength", "HoggingThreadCount", "StandbyThreadCount" ],
    "object_alias" : "ThreadPoolRuntime"
  },{
    "object_name" : "com.bea:ServerRuntime=MS1,Name=MS1,Type=JVMRuntime",
    "attributes" : [ "HeapFreePercent" ],
    "object_alias" : "Heap"
  }]
}

it is noted without aggregation filter, each alias are collecting its each attribute as a separate event in elasticsearch; to handle this issue, i used aggregate filter in the case of using just one alias and it collects multi attributes in one event properly. but, when i am defining two aliases, it cannot collect attributes of each alias in one event correctly.

for example, in the case of using just one alias (number 1), starting logstash leading to following output which is expected:

"@timestamp" => 2019-07-22T04:00:10.834Z,
          "info" => [
        [ 0] {
            "HeapMemoryUsage_max" => 477626368
        },
        [ 1] {
            "HeapMemoryUsage_init" => 268435456
        },
        [ 2] {
            "HeapMemoryUsage_committed" => 428867584
        },
        [ 3] {
            "HeapMemoryUsage_used" => 243034944
        },
        [ 4] {
            "NonHeapMemoryUsage_max" => -1
        },
        [ 5] {
            "ThreadCount" => 70
        },
        [ 6] {
            "TotalStartedThreadCount" => 472
        },
        [ 7] {
            "StandbyThreadCount" => 14
        },
        [ 8] {
            "PeakThreadCount" => 74
        },
        [ 9] {
            "DaemonThreadCount" => 67
        },
        [10] {
            "NonHeapMemoryUsage_init" => 2555904
        },
        [11] {
            "NonHeapMemoryUsage_committed" => 275718144
        },
        [12] {
            "NonHeapMemoryUsage_used" => 271122640
        },
        [13] {
            "StuckThreadCount" => 0
        },
        [14] {
            "Throughput" => 5.497251374312843
        },
        [15] {
            "HoggingThreadCount" => 1
        },
        [16] {
            "QueueLength" => 0
        },
        [17] {
            "HeapFreePercent" => 48
        }
    ],
      "@version" => "1",
     "jmx_alias" => "srv1"

but using both two aliases leads to following result which is not expected:

    "@timestamp" => 2019-07-22T04:03:48.998Z,
     "jmx_alias" => "srv1",
      "@version" => "1"
}
{
          "info" => [
        [0] {
            "NonHeapMemoryUsage_max" => -1
        },
        [1] {
            "StuckThreadCount" => 0
        }
    ],
    "@timestamp" => 2019-07-22T04:03:49.003Z,
     "jmx_alias" => "srv1",
      "@version" => "1"
}
{
          "info" => [
        [0] {
            "HeapMemoryUsage_used" => 234296944
        }
    ],
    "@timestamp" => 2019-07-22T04:03:49.009Z,
     "jmx_alias" => "srv2",
      "@version" => "1"
}
{
          "info" => [
        [0] {
            "NonHeapMemoryUsage_used" => 268786232
        }
    ],
    "@timestamp" => 2019-07-22T04:03:49.011Z,
     "jmx_alias" => "srv2",
      "@version" => "1"
}
{
          "info" => [
        [0] {
            "NonHeapMemoryUsage_init" => 2555904
        }
    ],
    "@timestamp" => 2019-07-22T04:03:49.001Z,
     "jmx_alias" => "srv1",
      "@version" => "1"
}
{
          "info" => [
        [0] {
            "HeapMemoryUsage_committed" => 445644800
        },
        [1] {
            "TotalStartedThreadCount" => 150
        }
    ],
    "@timestamp" => 2019-07-22T04:03:49.002Z,
     "jmx_alias" => "srv2",
      "@version" => "1"

how can i handle this issue?

You have used push_previous_map_as_event, which tells the aggregate plugin to push an event every time it sees a new task id. logstash does not maintain the order of events, even with '-pipeline.workers 1', so events for the two values of jmx_alias will get mixed up.

Try using push_map_as_event_on_timeout with a timeout shorter than your polling interval.

many thanks for your reply,
I try your suggestion but still some of data are missed as an event. is it possible to install multi logstash node on Production environment so that each one has one of configuration files ( logstash 1 has number 1 conf, logstash 2 has number 2 conf) ?

You could certainly run multiple copies of logstash. You could also run each configuration in its own pipeline in a single logstash instance.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.