Logstash 5.2 input kafka working but no output to elasticsearch

Issue Summary:

  • filebeat outputs to kafka 0.10 topic
  • logstash indexer inputs from kafka 0.10
  • outputs to elasticsearch

Problem: document count on elasticsearch doesn't change

Note: I'm able to get filebeat=>logstash=>elasticsearch working ok. When I try to add the kafka broker and adjust input/output accordingly, the content consumed by logstash doesn't output to elasticsearch.


logstash version

# /usr/share/logstash/bin/logstash -V
logstash 5.2.0

installed from elastic.co repository on CentOS

# grep baseurl /etc/yum.repos.d/elastic-co.repo

# cat /etc/redhat-release
CentOS Linux release 7.3.1611 (Core)


FileBeat agent


- input_type: log
    - /var/log/app1/app1.log
    - /var/log/app2/debug.log
    - /var/log/app2/system.log
    - /var/log/app2/app2.log
  document_type: clusterlogs
#================================ Outputs =====================================
    enabled: false 
    hosts: ["logstash-indexer1-fqdn:5044","logstash-indexer2-fqdn:5044"]
    loadbalance: true

    enabled: true
    hosts: ["kafka3-fqdn:9092", "kafka1-fqdn:9092", "kafka2-fqdn:9092"]
    topic: '%{[type]}'
#================================ Logging =====================================
logging.level: warning
logging.to_files: true
logging.to_syslog: false
  path: /var/log
  name: filebeat.log
  keepfiles: 3
logging.selectors: ["*"]


path.data: /var/lib/logstash
pipeline.workers: 6
path.config: /etc/logstash/conf.d
log.level: warn
path.logs: /var/log/logstash

files in /etc/logstash/conf.d

input {
  kafka {
    bootstrap_servers => "kafka3-fqdn:9092,kafka1-fqdn:9092,kafka1-fqdn:9092"
    topics => ["clusterlogs"]
    client_id => "logstash-hostname"
    group_id => "logstash_indexer"
    add_field => {
      "log_origin" => "kafka"
# this file intentionally left blank
output {
  elasticsearch {
    hosts => "elastic-search-client-node:9200"
    manage_template => false
    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
    document_type => "%{[@metadata][type]}"


Note that I cant switch from kafka to logstash output on filebeat and everything works as expected.

Is the data in kafka? Is it pulling it from there?

Is the data in kafka? Is it pulling it from there?

filebeat pushes to kafka. the topic 'clusterlogs' was automatically created in kind.
the data is in kafka as shown via bin/kafka-console-consumer.sh
when i turn on debugging in logstash, i can see what's mentioned in the DEBUG log that i linked to. the DEBUG log, to me, plainly shows data being pulled. Example:

[2017-02-10T22:11:30,199][DEBUG][logstash.pipeline        ] filter received {"event"=>
{"@timestamp"=>2017-02-10T22:11:30.159Z, "log_origin"=>"kafka", "@version"=>"1", 
\"input_type\":\"log\",\"message\":\" WARN [ScheduledTasks:1] 2017-02-10 16:11:16,486
GCInspector.java (line 142) Heap is 0.981591859186788 full.  You may need to reduce memtable
and/or cache sizes.  app2 will now flush up to the two largest memtables to free up memory.
Adjust flush_largest_memtables_at threshold in app2.yaml if you don't want app2 to do this

the issue is the logstash doesn't output to elasticsearch.

Add a stdout and see if anything makes it through your pipeline then.

would there be a difference between stdout and the debug logging already posted?

here's the exposed link http://pastebin.com/ibmVj3GJ



if anyone is interested, there seems to be an issue when using the kafka plugin.

my logstash output to elastic is

output {
  elasticsearch {
    hosts => "elastic-search-client-node:9200"
    manage_template => false
    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
    document_type => "%{[@metadata][type]}"

which ended up creating the following index in elasticsearch.

(es index when using kafka input plugin)

when i use the filebeat input, i get

(es index when using filebeat input plugin)

i'm currently in the middle of setting document_types and index/topics manually to get everything to line up and will report back, if this works.


in the event someone else runs into this it looks like when one uses the kafka input plugin, logstash doesn't expand @metadata fields anymore. to work around this, i'm using ansible vars since i'm templating these files out from an ansible playbook. You may have to set them to static values and manage accordingly.

logstash config:

output_elastic (@metadata doesn't parse):

#    index => "%{[@metadata][type]}-%{+YYYY.MM.dd}"
# ^^^ changed to ...
index => "filebeat-%{+YYYY.MM.dd}"

input kafka
the doc_and_topic_via_ansible var is set via ansible.

# make sure 'topics' field matches 'doctype' throughout pipeline
input {
  kafka {
    bootstrap_servers => "blablabla"
    topics => ["{{ doc_and_topic_via_ansible }}"]
    client_id => "{{ ansible_hostname }}"
    codec => json
    group_id => "logstash_indexer"
    add_field => {
      "broker" => "input_from_kafka"

this creates the elasticsearch index as expected and everything works with one minor exception. Events go into elasticsearch with _type set as the non-expanded value below:

_type	   	%{[@metadata][type]}

instead of the expected value of "filebeat-" as expected by setting that field in filebeat.yml shipper running on endpoint machines with document_type in the input section.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.