How to aggregate metrics from netdata in logstash?

Thank you in advance for any help!

I'm using Netdata to collect metrics from servers and then send them to Logstash and to Elastic.

My need is to aggregate metrics with same fields and create a single event but in nested format.

I also open a topic in:

This a example of input from Netdata:

{"host":"centosdns","@version":"1","port":52212,"@timestamp":"2019-01-19T16:16:22.117Z","message":"netdata.centosdns.disk_await.centos_swap.reads 0.0000000 1547914548"}
{"host":"centosdns","@version":"1","port":52212,"@timestamp":"2019-01-19T16:16:22.117Z","message":"netdata.centosdns.disk_await.centos_swap.writes 0.0000000 1547914548"}
{"host":"centosdns","@version":"1","port":52212,"@timestamp":"2019-01-19T16:16:22.117Z","message":"netdata.centosdns.disk_await.centos_root.reads 0.0000000 1547914548"}
{"host":"centosdns","@version":"1","port":52212,"@timestamp":"2019-01-19T16:16:22.117Z","message":"netdata.centosdns.disk_await.centos_root.writes 0.0000000 1547914548"}

My config file of logstash looks like:

input {
    tcp {
      port => 1234
    }
}

filter {
    # I take 'message' field and separate in different fields
    grok {
      named_captures_only => "true"
      pattern_definitions => {
        "CHART" => "[a-z]\w+"
        "FAMILY" => "[_a-z0-9]+"
    }

    match => {
      "message" => "%{WORD:prefix}\.%{WORD:hostname}\.%{CHART:chart}\.%{FAMILY:family}\.%{NOTSPACE:dimension} %{NUMBER:val} %{NUMBER:timestamp}" 
    }
}

if "_grokparsefailure" not in [tags] {
    mutate {
      remove_field => [ "@version", "host", "port", "prefix" ]
    }

    # Attempt to create a nested field and then aggregate 
    mutate {
      id => "chart_field"
      add_field => { "[%{chart}][%{family}][%{dimension}][value]" => "%{val}"
      }
    }

    aggregate {
      task_id => "[%{chart}][%{family}]"
      code => "
      # I tried many codes to aggregate but without success 
      event.cancel()
      "
      push_previous_map_as_event => true
      timeout => 5
    }

    mutate {
      # Remove unnecessary fields
      id => "netdata_mutate_remove"
      remove_field => [ "timestamp", "message"]
    }
} else {
    drop{}
}

output {
# TESTING PURPOSES
if "_aggregateexception" in [tags] {

    file {
      path => "/var/log/logstash/netdata/aggregatefailures-%{+MM-dd}.log"
    }
} else {
    file {
      path => "/var/log/logstash/netdata/netdata-%{+MM-dd}-aggregate.log"
    }
}

stdout { codec => rubydebug }
}

Take the input above:

"netdata.centosdns.disk_await.centos_swap.reads 0.0000000"
"netdata.centosdns.disk_await.centos_swap.writes 0.0000000"

My objective is make a nested field like:

disk_await: { # Chart
  centos_swap: { # Family
   [
    reads => 0.0000000, # Dimension => Value
    writes => 0.0000000 # Dimension => Value
   ]
  }
}

I pretend to aggregate all 'Dimension'Value'' in the same 'Chart''Family', this is only four lines of metrics but in reality we talk about 1000 per second or even more in some cases, all metrics are dynamic, it's virtual impossible to know all the names.

At this moment I'm using:

Logstash v.6.5.4 on a Virtualbox CentOS 7 minimal
All plugins (inputs/filters/outputs) updated

I would remove the not in the test for _grokparsefailure. It's then easier to see that we drop unmatched lines and otherwise do all the complicated stuff. The only events that come out of the pipeline are those generated by aggregate, so there is no reason to remove fields. So...

if "_grokparsefailure" in [tags] {
    drop{}
} else {
    aggregate {
        task_id => "[%{chart}][%{family}]"
        code => '
            n = event.get("chart") + "." + event.get("family")
            map[n] ||= []
            map[n] << { event.get("dimension") => event.get("val") }
            event.cancel()
        '
        push_previous_map_as_event => true
        timeout => 5
    }
    de_dot { nested => true }

will get you

"disk_await" => {
    "centos_root" => [
        [0] {
            "reads" => "0.0000000"
        },
        [1] {
            "writes" => "0.0000000"
        }
    ]
}

Not sure if that is what you want.

Thank you so much for your help.

This code it's very useful but one thing that you use is de_dot plugin. For my understanding de_dot it's very CPU consuming and I'm forcing Logstash to use just 1 pipeline for aggregate to work. So I don't know how this will work in performance.

In this couple of days I'm been learning ruby and I made this code (I know this isn't look pretty but what I want it's that works :wink:)

aggregate {
  task_id => "[%{hostname}][%{chart}][%{family}]"
  code => "
    tmp_all ||= {}
    tmp_family ||= {}
    
    host_tmp = event.get('hostname')
    chart_tmp = event.get('chart')
    family_tmp = event.get('family')
    dimen_tmp = event.get('dimension')
    dimen_list ||= []

    tmp_all = event.get(host_tmp)
    tmp_family = tmp_all[chart_tmp]
    
    if (tmp_all.has_key? chart_tmp )
    #Test if chart_tmp is present in host_tmp
      if (tmp_all[chart_tmp].has_key? family_tmp )
      #Test if family_tmp is present in chart_tmp
        if (tmp_all[chart_tmp][family_tmp].has_key? dimen_tmp )
        #Test if dimen_tmp is present in family_tmp
          tmp_all.to_hash.each do |key,value|
          #key = chart | value = family{dime=>val}
            value.to_hash.each do |k,v|
            #k = centos_root|v = {operations => 0.0000000}
              v.to_hash.each do |dim,val|
              #dim = dimension | val = value
              dimem_final = dim
              value_final = val
                if !(tmp_family.has_key? dimem_final)
                #Check dimension for not duplicate
                  tmp_family = { dimem_final => value_final }
	        end
	      end
	    end
	  end
	 end
        end
       end
map[chart_tmp] = tmp_family
event.cancel()
"
push_previous_map_as_event => true
timeout => 5
}

The problem here is the 'tmp_family' doesn't add more than one 'dimem_final => value_final'


Once again I appreciate your help, but you don't mind to take a look?

I will try your code to see the performance and if catch all events.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.