Parsing filebeat -> kafka -> logstash

Hi,

I am using the following versions of software:
filebeat: 5.4.0
kafka: 0.10.0.1
logstash: 5.4.0

When I directly connect filebeat to logstash my log's fields such as host and source get parsed sensibly, eg:

{
  "_index": "logstash-2017.05.15",
  "_type": "log",
  "_id": "AVwLa6bjfzTWzNZQ1MEK",
  "_version": 1,
  "_score": null,
  "_source": {
    "@timestamp": "2017-05-15T09:23:40.739Z",
    "offset": 13692098,
    "@version": "1",
    "beat": {
      "hostname": "host.domain.com",
      "name": "host.domain.com",
      "version": "5.4.0"
    },
    "input_type": "log",
    "host": "host.domain.com",
    "source": "/var/log/messages",
    "message": "May 15 10:23:39 host root: test",
    "type": "log",
    "tags": [
      "beats_input_codec_plain_applied"
    ]
  },
  "fields": {
    "@timestamp": [
      1494840220739
    ]
  },
  "sort": [
    1494840220739
  ]
}

When I use kafka as transport in between filebeat and logstash pretty much all the information remains in the message field, eg

{
  "_index": "logstash-2017.05.15",
  "_type": "logs",
  "_id": "AVwLRbwQL-zbtw-GlIKo",
  "_version": 1,
  "_score": null,
  "_source": {
    "@timestamp": "2017-05-15T08:42:18.502Z",
    "@version": "1",
    "message": "{\"@timestamp\":\"2017-05-15T08:42:16.927Z\",\"beat\":{\"hostname\":\"host.domain.com\",\"name\":\"host.domain.com\",\"version\":\"5.4.0\"},\"input_type\":\"log\",\"message\":\"May 15 09:42:16 host root: test\",\"offset\":10735391,\"source\":\"/var/log/messages\",\"type\":\"log\"}"
  },
  "fields": {
    "@timestamp": [
      1494837738502
    ]
  },
  "sort": [
    1494837738502
  ]
}

My configuration files:
logstash.conf

input {
        beats {
                port => 5001
        }

        kafka {
                bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
                topics            => [ "logging" ]
        }
}

filter {
}

output {
        elasticsearch {
                hosts => "elasticsearch:9200"
        }
}

filebeat.yml

filebeat.prospectors:
- input_type: log
  paths:
    - /var/log/*.log
    - /var/log/messages

output.kafka:
  hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
  topic: logging
  version: 0.10.0

output.logstash:
  hosts: ["10.64.3.108:5001"]

Does anyone know how I can configure logstash/filebeats so that I get the same behaviour when using kafka as transport as I do without kafka?

solution was to set the codec to json for the kafka input:

        kafka {
                bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
                codec             => "json"
                topics            => [ "logging" ]
        }
1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.