Json is storing as string in es

ELK I created is filebeat - > KAFKA -> logstash -> ES &Kibana (AWS elastic search)

the logs are generated in json format and it is pushed to kafka with below config:

=========================
filebeat.prospectors:

  • input_type: log
    paths:
    • /srv/my_app/src/logs/*.log
      document_type: my_app
      path.home: /opt/filebeat-5.4.2-linux-x86_64/
      path.config: {path.home} path.data: {path.home}/data
      filebeat.registry_file: ${path.data}/registry
      logging.level: debug
      logging.to_files: true
      logging.to_syslog: false
      logging.files:
      path: /var/log/mybeat
      name: mybeat.log
      keepfiles: 7
      output.kafka:
      enabled: true
      hosts: ["myhost.kafka1","myhostkafka2","myhostkafka3"]
      topic: my_app
      version: 0.9.0.0
      worker: 2
      compression: gzip
      logging.to_files: true
      logging.files: /tmp/filebeat_kafka.log
      ============================================

Logstash config is below

input {

kafka {
bootstrap_servers => "myhost.kafka1","myhostkafka2","myhostkafka3"
topics => ["my_app"]
codec => json
}

output {

if [type_name] == "my_app" {
elasticsearch {
hosts => ["eshost1"]
index => "%{[type_name]}-%{+YYYY.MM.dd}"
document_type => "%{[type_name]}"
}
}
}

filter {

mutate {
	add_field => {"index_name" => '%{type}'}
	add_field => {"type_name" => '%{type}'}
    }
if [@metadata][type] == "my_app" {
       json {
               source => "message"
        }
    date {
	match => [time ,"ISO8601" ]
    }
}

}

The issue here is the message field is in json format and it is stored in es as string (I can see that through kibana) image attached below

Sample format of message:

{"name":"xxxx","vertical":"xxxx","clusterid":50,"hostname":"xxxx","pid":7833,"route":"xxxxx","uuid":"xxxx","level":30,"opened_from":"xxx","osVersion":"5.1.1","imei":"xxxxxx","networkType":"4G","language":"en","long":"81.4510731","lat":"25.748469","playStore":"true","version":"5.9.1","client":"androidapp","deviceName":"vivo_V3","deviceManufacturer":"vivo","deviceIdentifier":"vivo-vivoV3-862738034446452","sso_token":"fa7e30a5-28b3-44fd-b528-7ee465817cd7","client_ip":"xxxxxxxx","user_agent":"Dalvik/2.1.0 (Linux; U; Android 5.1.1; vivo V3 Build/LMY47V)","path":"xxxxxxxx","url":"xxxxx","verb":"GET","operation":"xxxxx","point":"exit","api_method":"xxxx","response_code":"xxxxx","response_time":9,"status_code":xxxx,"msg":"xxxx","time":"2017-07-13T16:52:16.156Z","src":{"file":"xxxxx","line":xxxxx},"v":0}

Please help me in figuring this out.

I have been struggling with the same issue for a while.
Same setup but I have metricbeat as the source, and now I cant visualize anything if I use codec json with the kafka input, i actually get zero data even though my index rate is very high but zero data in Kibana!

I'm using 5.4 for everything across my cluster.

Unless I do this:

filter {

    json {
      source => "message"
    }
}

input codec json wont work and everything in ES is just raw test.

If I use the above the data in ES will start getting congested and get super delayed, to a point where everything is just having the same time stamp!

any luck with your issue ?

if [@metadata][type] == "my_app" {

I don't see [@metadata][type] being populated anywhere. Is this condition ever true?

I'd expect the kafka input's codec => json to do the job, though.

here is what resolved my issue, hopefully will help you, im running LS 5.4 and this resolved the json codec not working.

bin/logstash-plugin update logstash-input-kafka

update your LS Kafka input plugin.

stop LS then run the above command.

BTW this may not work:

if [@metadata][type] == "my_app" {
       json {
               source => "message"
        }

check this topic:

You guys are right.

After replacing the filters without checking the metadata(it is not there in the logs), it is started working.

Thank you Guys. :slight_smile:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.