Kafka Input Tutorial for Logstash 5.1.1

Is there any third party documentation on how to use the Kafka plugin. Below is my logstash script

input
{
kafka
{
bootstrap_servers => ["192.168.31.55:9092,192.168.31.54:9092"]
topics => "test-beat33"
}
}

filter
{

}
output
{
stdout { codec => rubydebug }
}

Below is what I see on the console

c:\test\logstash-script>..\logstash-5.1.1\bin\logstash -f test.conf
Could not find log4j2 configuration at path /Users/A/test/logstash-5.1.1/config/log4j2.properties. Using default config which logs to console
16:51:46.992 [[main]-pipeline-manager] INFO logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
16:51:47.009 [[main]-pipeline-manager] INFO logstash.pipeline - Pipeline main started
16:51:47.106 [[main]<kafka] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [192.168.31.55:9092, 192.168.31.54:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = logstash
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = logstash
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = latest

16:51:47.191 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9600}
16:51:47.249 [[main]<kafka] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [192.168.31.55:9092, 192.168.31.54:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = logstash
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = logstash
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = latest

16:51:47.339 [[main]<kafka] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.1
16:51:47.341 [[main]<kafka] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : a7a17cdec9eaa6c5
16:51:47.513 [Ruby-0-Thread-16: C:/Users/A/test/logstash-5.1.1/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.0/lib/logstash/inputs/kafka.rb:225] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator 192.168.31.54:9092 (id: 2147483646 rack: null) for group logstash.
16:51:47.519 [Ruby-0-Thread-16: C:/Users/A/test/logstash-5.1.1/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.0/lib/logstash/inputs/kafka.rb:225] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [] for group logstash
16:51:47.523 [Ruby-0-Thread-16: C:/Users/A/test/logstash-5.1.1/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.0/lib/logstash/inputs/kafka.rb:225] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group logstash
16:51:47.556 [Ruby-0-Thread-16: C:/Users/A/test/logstash-5.1.1/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.0/lib/logstash/inputs/kafka.rb:225] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Successfully joined group logstash with generation 5
16:51:47.561 [Ruby-0-Thread-16: C:/Users/A/test/logstash-5.1.1/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.0/lib/logstash/inputs/kafka.rb:225] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [test-beat33-0] for group logstash

Despite seeing so many messages I cannot see any logs in the console. When I check the Kafka server using the kafka-console-consumer I can see my logs.

Is there any third party tutorial or documentation that I can use? Is there something wrong with logstash or there are some secret information on usage that is not disclosed to the public

Which console are you referring to? Your logstash console?

Could not find log4j2 configuration at path /Users/A/test/logstash-5.1.1/config/log4j2.properties. Using default config which logs to console

You can try to start logstash as:
..\logstash-5.1.1\bin\logstash -f test.conf --path.settings=dir_name_where_logstash.yml_is_located --config.debug --log.level debug after which you can see the logs in logstash logs directory.

I know what is the problem. All data that arrived in kafka before logstash has started will automatically ignored by logstash. only data that arrives after logstash has already started will be processed by logstash.

Looks like default logstash configuration will result in data loss should logstash server goes down or need to be restarted in a production environment.

Is there anyway to solve this problem? This problem will defeat the purpose of using Kafka

See - https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-enable_auto_commit. By default this is set to true and so Kafka knows how much the logstash consumer group has read and knows the offset position. Does this answer your question?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.