Using Elastic stack with Kafka for reading system metrics

Hello,
Below is the flow of my Elastic stack (setup1)
metricbeat (system module) -> kafka cluster -> logstash -> elasticsearch -> kibana

In kibana I get the metrics in form of a single message like below

message:{"@timestamp":"2019-02-19T11:32:07.531Z","@metadata":{"beat":"metricbeat","type":"doc","version":"6.6.0","topic":"Nutanix2"},"event":{"dataset":"system.process","duration":21110912},"system":{"process":{"pid":301,"username":"root","cpu":{"total":{"value":2.31066e+06,"pct":0.001,"norm":{"pct":0.0003}},"start_time":"2018-11-12T06:04:08.000Z"},"memory":{"size":0,"rss":{"pct":0,"bytes":0} }

If I send the metrics from metricbeat (system module) ->elastic search ->kibana , the metrics come in form of (field:value) rather than as a single message and I can also use dasboards for visualization. For eg.

@timestamp:February 19th 2019, 13:44:36.895
beat.name:ecs-e976.novalocal
beat.hostname:ecs-e976.novalocal
beat.version:6.2.3
metricset.module:system metricset.rtt:73 metricset.name:network system.network.name:eth0 system.network.in.bytes:2.188GB system.network.in.packets:19,695,275 system.network.in.errors:0 system.network.in.dropped:0 system.network.out.dropped:0

My question is can I do something in my setup1 which will allow kibana to get the metrics in form of field:value. (other than writing grok filter).

My metricbeat output settings:
output.kafka:

initial brokers for reading cluster metadata

hosts: ["192.168.1.63:9092", "192.168.1.61:9092", "192.168.20.2:9092"]

message topic selection + partitioning

topic: 'Nutanix2'
partition.round_robin:
reachable_only: false

required_acks: 1
compression: gzip
max_message_bytes: 1000000

My Logstash conf file
input {
kafka {
id => "my_id"
bootstrap_servers => "192.168.1.63:9092, 192.168.1.61:9092, 192.168.20.2:9092"
topics => ["Nutanix2"]
}
}

output {
elasticsearch {
hosts => [ "192.168.1.231:9200" ]
index => "Nutanix2-%{+YYYY.MM.dd}"
}

file {
		path => /tmp/lokaf.out
		codec => line { format => "custom format: %{message}"}

}
}

Any help is appreciated.

Either add 'codec => json' to the kafka input, or add

 filter { json { source => "message" } }

Thanks a ton. Appreciate it. :slightly_smiling_face:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.