Unable to get Logstash to read from kafka input

In my setup I have Filebeats -> Kafka -> Logstash -> ElasticSearch.

I can see filebeats producing logs into Kakfa-topic and I am also able to read logs from kafka topic using kafka-consumer-client. But I do not see logs read by log stash for the topic configured. The log stash logs don't show any errors and nothing gets printed in stdout or in the log stash logs for the configs below.

Any help is appreciated.

Version:

Fillebeat :Filebeat-5.2.0
Kafka: kafka_2.11-0.10.2.0
Logstash: logstash 5.1.1

Filebeat config:

output.kafka:
hosts: ["10.1.1.1:9092"]
topic: 'csos_services_file'
compression: gzip
max_message_bytes: 1000000

output.console:
pretty: true

Logstash config:

input {
kafka {
bootstrap_servers => ["10.1.1.1:9092"]
type => "kafka-input"
topics => "csos_services_file"
}

filter {
}

output {
stdout { codec => json }
}

output {
elasticsearch {
hosts => ["10.6.0.4:9200"]
user => "es_admin"
password => "XXXX"
index => "csos_services_index.%{+YYYY.MM.dd}"
}
}

Logstash Logs:

[2017-04-12T05:14:47,049][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [10.1.1.1:2181]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = logstash
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = logstash
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = latest

[2017-04-12T05:14:47,161][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 0.10.0.1
[2017-04-12T05:14:47,161][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : a7a17cdec9eaa6c5

[2017-04-12T05:14:47,490][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>["http://~hidden~:~hidden~@10.6.0.4:9200"]}}
[2017-04-12T05:14:47,492][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:url=>#<URI::HTTP:0x39f67862 URL:http://~hidden~:~hidden~@10.6.0.4:9200>, :healthcheck_path=>"/"}
[2017-04-12T05:14:47,739][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<URI::HTTP:0x39f67862 URL:http://~hidden~:~hidden~@10.6.0.4:9200>}
[2017-04-12T05:14:47,762][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-04-12T05:14:47,829][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-", "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"default"=>{"_all"=>{"enabled"=>true, "omit_norms"=>true}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"string", "index"=>"analyzed", "omit_norms"=>true, "fielddata"=>{"format"=>"disabled"}}}}, {"string_fields"=>{"match"=>"", "match_mapping_type"=>"string", "mapping"=>{"type"=>"string", "index"=>"analyzed", "omit_norms"=>true, "fielddata"=>{"format"=>"disabled"}, "fields"=>{"raw"=>{"type"=>"string", "index"=>"not_analyzed", "doc_values"=>true, "ignore_above"=>256}}}}}, {"float_fields"=>{"match"=>"", "match_mapping_type"=>"float", "mapping"=>{"type"=>"float", "doc_values"=>true}}}, {"double_fields"=>{"match"=>"", "match_mapping_type"=>"double", "mapping"=>{"type"=>"double", "doc_values"=>true}}}, {"byte_fields"=>{"match"=>"", "match_mapping_type"=>"byte", "mapping"=>{"type"=>"byte", "doc_values"=>true}}}, {"short_fields"=>{"match"=>"", "match_mapping_type"=>"short", "mapping"=>{"type"=>"short", "doc_values"=>true}}}, {"integer_fields"=>{"match"=>"", "match_mapping_type"=>"integer", "mapping"=>{"type"=>"integer", "doc_values"=>true}}}, {"long_fields"=>{"match"=>"", "match_mapping_type"=>"long", "mapping"=>{"type"=>"long", "doc_values"=>true}}}, {"date_fields"=>{"match"=>"", "match_mapping_type"=>"date", "mapping"=>{"type"=>"date", "doc_values"=>true}}}, {"geo_point_fields"=>{"match"=>"", "match_mapping_type"=>"geo_point", "mapping"=>{"type"=>"geo_point", "doc_values"=>true}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "doc_values"=>true}, "@version"=>{"type"=>"string", "index"=>"not_analyzed", "doc_values"=>true}, "geoip"=>{"type"=>"object", "dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip", "doc_values"=>true}, "location"=>{"type"=>"geo_point", "doc_values"=>true}, "latitude"=>{"type"=>"float", "doc_values"=>true}, "longitude"=>{"type"=>"float", "doc_values"=>true}}}}}}}}
[2017-04-12T05:14:47,837][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["10.6.0.4:9200"]}
[2017-04-12T05:14:47,850][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>125}
[2017-04-12T05:14:47,857][INFO ][logstash.pipeline ] Pipeline main started
[2017-04-12T05:14:47,935][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}

Any help is appreciated on this one/

Try adding a group_id to your input plugin.
input {
kafka {
bootstrap_servers => ["10.1.1.1:9092"]
type => "kafka-input"
topics => "csos_services_file"
auto_offset_reset => "earliest"
group_id => "Dexter"
}

Thanks for the suggestion, But it didn't work with group_id as well.

Is kafka input supported with logstash 5.1.1 ? And no debug logs seen on logstash

Fillebeat :Filebeat-5.2.0
Kafka: kafka_2.11-0.10.2.0
Logstash: logstash 5.1.1

2 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.