Logstash not forwarding logs to ES via Kafka

I'm using ELK 5.0.1 and Kafka 0.10.1.0 . I'm not sure why my logs aren't forwarding I installed Kafkacat and was successfully able to Produce and Consume logs from all the 3 servers where Kafka cluster is installed.

shipper.conf

input {
    file {
            start_position => "beginning"
            path => "/var/log/logstash/logstash-plain.log"
            }
}

 output {
 kafka {
    topic_id => "stash"
    bootstrap_servers  => "<i.p1>:9092,<i.p2>:9092,<i.p3>:9092"
  }
}

receiver.conf

input {
kafka {
        topics => ["stash"]
        group_id => "stashlogs"
        consumer_threads => 1
        bootstrap_servers  => "<i.p1>:2181,<i,p2>:2181,<i.p3>:2181"
        
}
}

 output {
    elasticsearch {
       hosts => ["<eip>:9200","<eip>:9200","<eip>:9200"]
       manage_template => false
       index => "logstash-%{+YYYY.MM.dd}"
  }
 }

Logs: Getting the below warnings in logstash-plain.log
[2017-04-17T16:34:28,238][WARN ][org.apache.kafka.common.protocol.Errors] Unexpected error
code: 38.
[2017-04-17T16:34:28,238][WARN ][org.apache.kafka.clients.NetworkClient] Error while fetching
metadata with correlation id 44 : {stash=UNKNOWN}

@luciferdude Kafka 0.10.1.0 is not compatible with LS 5.x. LS 5.x is compatible with 0.10.0.

To use kafka 0.10.1.0, you need to manually install a newer version of kafka input:

bin/logstash-plugin install --version 6.2.7 logstash-input-kafka

Hi @suyograo

Thank you for the reply. I installed 6.2.7 version of input kafka plugin and still same result. There is no error in logs but they are not being forwarded to ELK. I restarted logstash on both shipper and receiver and same result.

logstash logs from receiver

[2017-04-18T04:01:12,698][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 0.10.1.1
[2017-04-18T04:01:12,699][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : f10ef2720b03b247
[2017-04-18T04:01:13,031][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>["http://eip1:9200", "http://eip2:9200", "http://eip3:9200"]}}
[2017-04-18T04:01:13,035][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["eip1:9200", "eip2:9200", "eip3:9200"]}
[2017-04-18T04:01:13,038][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-04-18T04:01:13,043][INFO ][logstash.pipeline        ] Pipeline main started
[2017-04-18T04:01:13,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

logstash logs from shipper.

[2017-04-18T03:35:57,620][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 0.10.0.1
[2017-04-18T03:35:57,620][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : a7a17cdec9eaa6c5
[2017-04-18T03:35:57,626][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
[2017-04-18T03:35:57,645][INFO ][logstash.pipeline        ] Pipeline main started
[2017-04-18T03:35:57,692][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Version of Kafka on 3 server cluster:

grep "Kafka version" /var/log/kafka/*
/var/log/kafka/kafkaServer.out:[2017-04-18 02:54:24,941] INFO Kafka version : 0.10.1.0 (org.apache.kafka.common.utils.AppInfoParser)

logstash logs from shipper.

This still shows Kafka 0.10.0.1

I don't know what you mean exactly by "shipper", but assuming you are using a Kafka output, you need to also update it

bin/logstash --version 6.1.5 logstash-output-kafka

Then make sure you are writing some data to Kafka. By default, we only start consuming at the end of the topic, not beginning.

@suyograo . thanks for the reply. yes, I've been wondering the same . I have 6.2.7 for both input and output. yes, I'm writing to kafka out which I'll downgrade now and test.

yes, l did a dummy write to the log file after making the changes and restarting logstash.

@suyograo

I retried after downgrading to 6.1.5 on kafka output and still same result.

[2017-04-18T12:58:54,770][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 0.10.1.1
[2017-04-18T12:58:54,772][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : f10ef2720b03b247
[2017-04-18T12:58:54,778][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
[2017-04-18T12:58:54,785][INFO ][logstash.pipeline        ] Pipeline main started
[2017-04-18T12:58:54,825][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Output:

bin/logstash-plugin list --verbose logstash-output-kafka
logstash-output-kafka (6.1.5)

Input:

bin/logstash-plugin list --verbose logstash-input-kafka
logstash-input-kafka (6.2.7)

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.