Logstash consuming Kafka Topic but don't output document

Hello,

I'm trying to get the stack working with Filebeat => Kafka <= Logstash => Elasticsearch

Sometimes the configuration is working but sometimes logstash is consuming the kafka topic but not sending the document to all output (elasticsearch, stdout, TCP..), and I can't identify why..

Filebeat Config

output.kafka:
  hosts: ["kafka2-3:9093","kafka1-3:9093","kafka3-3:9093"]
  topic: 'filebeat'
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max-message_bytes: 1000000
  codec.json:
     pretty: false
  ssl.certificate_authorities:
    - /etc/pki/tls/certs/ca.***.crt
  ssl.certificate: /etc/pki/tls/certs/***.crt
  ssl.key: /etc/pki/tls/private/***.key

Logstash Config :

input {
     kafka {
     bootstrap_servers => "*:9093,*:9093,*:9093"
     topics => ["filebeat"]
     group_id => "logstash6"
     consumer_threads => 2
     ssl_keystore_password => "${keystore.password}"
     ssl_keystore_location => "/etc/logstash/client_keystore.jks"
     ssl_keystore_type => "jks"
     ssl_truststore_location => "/etc/logstash//client_truststore.jks"
     ssl_truststore_password => "${truststore.password}"
     ssl_truststore_type => "jks"
     ssl_endpoint_identification_algorithm => ""
     security_protocol => "SSL"
     decorate_events => false
     codec => line
    #codec => json
 }
}

output {
    ## Output to log archive
    tcp {
        port => 601
        host => "*.*.*.*"
        codec => line
        mode => "client"
    }
    #stdout { codec => line }
    # if [agent][type] == winlogbeat {
    #     pipeline { send_to => winlogbeat }
    # }
    #if [agent][type] == filebeat {
    #  pipeline { send_to => kv_filter }
    # }
    pipeline { send_to => elasticsearch }
}

Elasticsearch config : 
       elasticsearch {
        hosts => [ "https://**:9200", "https://**:9200", "https://**:9200"]
        index => "logging-filebeat-%{+YYYY.MM.dd}"
        user => "${elasticsearch.username}"
        password => "${elasticsearch.password}"
        keystore_password => "${keystore.password}"
        keystore => "/etc/logstash/client_keystore.jks"
        truststore => "/etc/logstash/client_truststore.jks"
        truststore_password => "${truststore.password}"
        ssl => true
       }

Logstash with stdout { codec => line } not output in console

In debug mode we see :

[2020-10-05T17:49:20,784][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(filebeat-0), toForget=(), implied=()) to broker logmgmt-kafka3-3:9093 (id: 2 rack: null)
[2020-10-05T17:49:20,797][DEBUG][org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=logstash-0, groupId=logstash6] Node 2 sent an incremental fetch response for session 942431334 with 1 response partition(s)
[2020-10-05T17:49:20,797][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Fetch READ_UNCOMMITTED at offset 7920067 for partition filebeat-0 returned fetch data (error=NONE, highWaterMark=10101507, lastStableOffset = 10101507, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576)
[2020-10-05T17:49:20,881][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash6] Sending Heartbeat request to coordinator logmgmt-kafka1-3:9093 (id: 2147483647 rack: null)
[2020-10-05T17:49:20,883][DEBUG][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash6] Using older server API v3 to send HEARTBEAT {group_id=logstash6,generation_id=21,member_id=logstash-0-30f35c9d-0b38-43b8-a086-bd693cd43b04,group_instance_id=null} with correlation id 87 to node 2147483647
[2020-10-05T17:49:21,367][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash6] Received successful Heartbeat response
[2020-10-05T17:49:21,438][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Added READ_UNCOMMITTED fetch request for partition filebeat-0 at position FetchPosition{offset=7931447, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=logmgmt-kafka3-3:9093 (id: 2 rack: null), epoch=-1}} to node logmgmt-kafka3-3:9093 (id: 2 rack: null)
[2020-10-05T17:49:21,439][DEBUG][org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=logstash-0, groupId=logstash6] Built incremental fetch (sessionId=942431334, epoch=63) for node 2. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
[2020-10-05T17:49:21,439][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(filebeat-0), toForget=(), implied=()) to broker logmgmt-kafka3-3:9093 (id: 2 rack: null)
[2020-10-05T17:49:21,457][DEBUG][org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=logstash-0, groupId=logstash6] Node 2 sent an incremental fetch response for session 942431334 with 1 response partition(s)
[2020-10-05T17:49:21,457][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Fetch READ_UNCOMMITTED at offset 7931447 for partition filebeat-0 returned fetch data (error=NONE, highWaterMark=10121141, lastStableOffset = 10121141, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576)
[2020-10-05T17:49:22,082][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash6] Sending Heartbeat request to coordinator logmgmt-kafka1-3:9093 (id: 2147483647 rack: null)
[2020-10-05T17:49:22,083][DEBUG][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-1, groupId=logstash6] Using older server API v3 to send HEARTBEAT {group_id=logstash6,generation_id=21,member_id=logstash-1-94e3d3cb-c6eb-4f93-ba71-f62a95903825,group_instance_id=null} with correlation id 32 to node 2147483647
[2020-10-05T17:49:22,086][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash6] Received successful Heartbeat response
[2020-10-05T17:49:22,094][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Added READ_UNCOMMITTED fetch request for partition filebeat-0 at position FetchPosition{offset=7942668, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=logmgmt-kafka3-3:9093 (id: 2 rack: null), epoch=-1}} to node logmgmt-kafka3-3:9093 (id: 2 rack: null)
[2020-10-05T17:49:22,094][DEBUG][org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=logstash-0, groupId=logstash6] Built incremental fetch (sessionId=942431334, epoch=64) for node 2. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
[2020-10-05T17:49:22,095][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(filebeat-0), toForget=(), implied=()) to broker logmgmt-kafka3-3:9093 (id: 2 rack: null)
[2020-10-05T17:49:22,107][DEBUG][org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=logstash-0, groupId=logstash6] Node 2 sent an incremental fetch response for session 942431334 with 1 response partition(s)
[2020-10-05T17:49:22,107][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Fetch READ_UNCOMMITTED at offset 7942668 for partition filebeat-0 returned fetch data (error=NONE, highWaterMark=10141574, lastStableOffset = 10141574, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576)
[2020-10-05T17:49:22,762][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Added READ_UNCOMMITTED fetch request for partition filebeat-0 at position FetchPosition{offset=7954536, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=logmgmt-kafka3-3:9093 (id: 2 rack: null), epoch=-1}} to node logmgmt-kafka3-3:9093 (id: 2 rack: null)
[2020-10-05T17:49:22,763][DEBUG][org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=logstash-0, groupId=logstash6] Built incremental fetch (sessionId=942431334, epoch=65) for node 2. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
[2020-10-05T17:49:22,763][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(filebeat-0), toForget=(), implied=()) to broker logmgmt-kafka3-3:9093 (id: 2 rack: null)
[2020-10-05T17:49:22,779][DEBUG][org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=logstash-0, groupId=logstash6] Node 2 sent an incremental fetch response for session 942431334 with 1 response partition(s)
[2020-10-05T17:49:22,780][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Fetch READ_UNCOMMITTED at offset 7954536 for partition filebeat-0 returned fetch data (error=NONE, highWaterMark=10163099, lastStableOffset = 10163099, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576)
[2020-10-05T17:49:22,864][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash6] Sending asynchronous auto-commit of offsets {filebeat-0=OffsetAndMetadata{offset=7959536, leaderEpoch=2, metadata=''}}
[2020-10-05T17:49:22,865][DEBUG][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash6] Using older server API v7 to send OFFSET_COMMIT {group_id=logstash6,generation_id=21,member_id=logstash-0-30f35c9d-0b38-43b8-a086-bd693cd43b04,group_instance_id=null,topics=[{name=filebeat,partitions=[{partition_index=0,committed_offset=7959536,committed_leader_epoch=2,committed_metadata=}]}]} with correlation id 91 to node 2147483647

In kafka topic we see the document consume :

 bin/kafka-consumer-groups.sh   --bootstrap-server **:9093 --describe --group logstash6 --command-config config/consumer.properties
[2020-10-05 17:48:23,635] WARN The configuration 'group.id' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)

Consumer group 'logstash6' has no active members.

    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    logstash6       filebeat        0          7081372         8232672         1151300         -               -               -

 bin/kafka-consumer-groups.sh   --bootstrap-server **:9093 --describe --group logstash6 --command-config config/consumer.properties
    [2020-10-05 17:49:22,527] WARN The configuration 'group.id' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)

    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
    logstash6       filebeat        0          7959536         10173678        2214142         logstash-0-30f35c9d-0b38-43b8-a086-bd693cd43b04 /*****     logstash-0

I really see document in topic when is execute --from-beginning --command-config config/consumer.properties

I try with logstash 7.6, 7.8, 7.8.1 and I'm in 7.9 now.

Kafka is in version 2.12-2.3.0
Filebeat in version 7.9 now (try 7.6, 7.8 and 7.9)
Elasticsearch in 7.9 now (try 7.6, 7.8 and 7.9)

Thanks for support

Hello,

I m facing the same issue.

Filebeat -> Kafka Topic -> Logstash -> ELK Cluster

Data from Kafka topics is been seen on output { stdout { codec => rubydebug } } on cluster 1.
Data from the same Kafka topic is been successfully delivered to an index on cluster 2.
Data can't be delivered only to cluster 1.

So it is not a logstash pipeline issue.

If anyone can suggest what could be a problem here. Our problems are very similar. And most probably same solution could be applied.

Is there something that i can verify on logstash ?

On the debug mode, we only see that it is connected to Kafka, Read and commit log.

May we can print some stuff between Input, Filter, Ouput ?

Something confirm that there is an issue is when I try to force CRTL+C logstash to force shutdown and wait it to flush on pipeline it say :

[2020-10-09T16:26:23,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:26:28,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:26:33,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:26:38,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:26:43,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:26:48,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:26:53,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:26:58,952][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:27:03,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:27:08,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:27:13,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:27:18,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:27:23,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:27:28,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:27:33,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:27:38,952][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:27:43,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:27:48,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:27:53,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:27:58,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:28:03,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:28:08,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:28:13,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:28:18,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:28:23,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:28:28,952][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:28:33,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:28:38,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:28:43,953][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:28:48,952][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2020-10-09T16:28:53,952][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.

I need to force the shutdown to get the process stop

Someone got any idea ?

Thanks

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.