Hello,
I'm trying to get the stack working with Filebeat => Kafka <= Logstash => Elasticsearch
Sometimes the configuration is working but sometimes logstash is consuming the kafka topic but not sending the document to all output (elasticsearch, stdout, TCP..), and I can't identify why..
Filebeat Config
output.kafka:
hosts: ["kafka2-3:9093","kafka1-3:9093","kafka3-3:9093"]
topic: 'filebeat'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max-message_bytes: 1000000
codec.json:
pretty: false
ssl.certificate_authorities:
- /etc/pki/tls/certs/ca.***.crt
ssl.certificate: /etc/pki/tls/certs/***.crt
ssl.key: /etc/pki/tls/private/***.key
Logstash Config :
input {
kafka {
bootstrap_servers => "*:9093,*:9093,*:9093"
topics => ["filebeat"]
group_id => "logstash6"
consumer_threads => 2
ssl_keystore_password => "${keystore.password}"
ssl_keystore_location => "/etc/logstash/client_keystore.jks"
ssl_keystore_type => "jks"
ssl_truststore_location => "/etc/logstash//client_truststore.jks"
ssl_truststore_password => "${truststore.password}"
ssl_truststore_type => "jks"
ssl_endpoint_identification_algorithm => ""
security_protocol => "SSL"
decorate_events => false
codec => line
#codec => json
}
}
output {
## Output to log archive
tcp {
port => 601
host => "*.*.*.*"
codec => line
mode => "client"
}
#stdout { codec => line }
# if [agent][type] == winlogbeat {
# pipeline { send_to => winlogbeat }
# }
#if [agent][type] == filebeat {
# pipeline { send_to => kv_filter }
# }
pipeline { send_to => elasticsearch }
}
Elasticsearch config :
elasticsearch {
hosts => [ "https://**:9200", "https://**:9200", "https://**:9200"]
index => "logging-filebeat-%{+YYYY.MM.dd}"
user => "${elasticsearch.username}"
password => "${elasticsearch.password}"
keystore_password => "${keystore.password}"
keystore => "/etc/logstash/client_keystore.jks"
truststore => "/etc/logstash/client_truststore.jks"
truststore_password => "${truststore.password}"
ssl => true
}
Logstash with stdout { codec => line } not output in console
In debug mode we see :
[2020-10-05T17:49:20,784][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(filebeat-0), toForget=(), implied=()) to broker logmgmt-kafka3-3:9093 (id: 2 rack: null)
[2020-10-05T17:49:20,797][DEBUG][org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=logstash-0, groupId=logstash6] Node 2 sent an incremental fetch response for session 942431334 with 1 response partition(s)
[2020-10-05T17:49:20,797][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Fetch READ_UNCOMMITTED at offset 7920067 for partition filebeat-0 returned fetch data (error=NONE, highWaterMark=10101507, lastStableOffset = 10101507, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576)
[2020-10-05T17:49:20,881][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash6] Sending Heartbeat request to coordinator logmgmt-kafka1-3:9093 (id: 2147483647 rack: null)
[2020-10-05T17:49:20,883][DEBUG][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash6] Using older server API v3 to send HEARTBEAT {group_id=logstash6,generation_id=21,member_id=logstash-0-30f35c9d-0b38-43b8-a086-bd693cd43b04,group_instance_id=null} with correlation id 87 to node 2147483647
[2020-10-05T17:49:21,367][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash6] Received successful Heartbeat response
[2020-10-05T17:49:21,438][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Added READ_UNCOMMITTED fetch request for partition filebeat-0 at position FetchPosition{offset=7931447, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=logmgmt-kafka3-3:9093 (id: 2 rack: null), epoch=-1}} to node logmgmt-kafka3-3:9093 (id: 2 rack: null)
[2020-10-05T17:49:21,439][DEBUG][org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=logstash-0, groupId=logstash6] Built incremental fetch (sessionId=942431334, epoch=63) for node 2. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
[2020-10-05T17:49:21,439][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(filebeat-0), toForget=(), implied=()) to broker logmgmt-kafka3-3:9093 (id: 2 rack: null)
[2020-10-05T17:49:21,457][DEBUG][org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=logstash-0, groupId=logstash6] Node 2 sent an incremental fetch response for session 942431334 with 1 response partition(s)
[2020-10-05T17:49:21,457][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Fetch READ_UNCOMMITTED at offset 7931447 for partition filebeat-0 returned fetch data (error=NONE, highWaterMark=10121141, lastStableOffset = 10121141, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576)
[2020-10-05T17:49:22,082][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash6] Sending Heartbeat request to coordinator logmgmt-kafka1-3:9093 (id: 2147483647 rack: null)
[2020-10-05T17:49:22,083][DEBUG][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-1, groupId=logstash6] Using older server API v3 to send HEARTBEAT {group_id=logstash6,generation_id=21,member_id=logstash-1-94e3d3cb-c6eb-4f93-ba71-f62a95903825,group_instance_id=null} with correlation id 32 to node 2147483647
[2020-10-05T17:49:22,086][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash6] Received successful Heartbeat response
[2020-10-05T17:49:22,094][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Added READ_UNCOMMITTED fetch request for partition filebeat-0 at position FetchPosition{offset=7942668, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=logmgmt-kafka3-3:9093 (id: 2 rack: null), epoch=-1}} to node logmgmt-kafka3-3:9093 (id: 2 rack: null)
[2020-10-05T17:49:22,094][DEBUG][org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=logstash-0, groupId=logstash6] Built incremental fetch (sessionId=942431334, epoch=64) for node 2. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
[2020-10-05T17:49:22,095][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(filebeat-0), toForget=(), implied=()) to broker logmgmt-kafka3-3:9093 (id: 2 rack: null)
[2020-10-05T17:49:22,107][DEBUG][org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=logstash-0, groupId=logstash6] Node 2 sent an incremental fetch response for session 942431334 with 1 response partition(s)
[2020-10-05T17:49:22,107][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Fetch READ_UNCOMMITTED at offset 7942668 for partition filebeat-0 returned fetch data (error=NONE, highWaterMark=10141574, lastStableOffset = 10141574, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576)
[2020-10-05T17:49:22,762][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Added READ_UNCOMMITTED fetch request for partition filebeat-0 at position FetchPosition{offset=7954536, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=logmgmt-kafka3-3:9093 (id: 2 rack: null), epoch=-1}} to node logmgmt-kafka3-3:9093 (id: 2 rack: null)
[2020-10-05T17:49:22,763][DEBUG][org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=logstash-0, groupId=logstash6] Built incremental fetch (sessionId=942431334, epoch=65) for node 2. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
[2020-10-05T17:49:22,763][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(filebeat-0), toForget=(), implied=()) to broker logmgmt-kafka3-3:9093 (id: 2 rack: null)
[2020-10-05T17:49:22,779][DEBUG][org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=logstash-0, groupId=logstash6] Node 2 sent an incremental fetch response for session 942431334 with 1 response partition(s)
[2020-10-05T17:49:22,780][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash6] Fetch READ_UNCOMMITTED at offset 7954536 for partition filebeat-0 returned fetch data (error=NONE, highWaterMark=10163099, lastStableOffset = 10163099, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576)
[2020-10-05T17:49:22,864][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash6] Sending asynchronous auto-commit of offsets {filebeat-0=OffsetAndMetadata{offset=7959536, leaderEpoch=2, metadata=''}}
[2020-10-05T17:49:22,865][DEBUG][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash6] Using older server API v7 to send OFFSET_COMMIT {group_id=logstash6,generation_id=21,member_id=logstash-0-30f35c9d-0b38-43b8-a086-bd693cd43b04,group_instance_id=null,topics=[{name=filebeat,partitions=[{partition_index=0,committed_offset=7959536,committed_leader_epoch=2,committed_metadata=}]}]} with correlation id 91 to node 2147483647
In kafka topic we see the document consume :
bin/kafka-consumer-groups.sh --bootstrap-server **:9093 --describe --group logstash6 --command-config config/consumer.properties
[2020-10-05 17:48:23,635] WARN The configuration 'group.id' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
Consumer group 'logstash6' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
logstash6 filebeat 0 7081372 8232672 1151300 - - -
bin/kafka-consumer-groups.sh --bootstrap-server **:9093 --describe --group logstash6 --command-config config/consumer.properties
[2020-10-05 17:49:22,527] WARN The configuration 'group.id' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
logstash6 filebeat 0 7959536 10173678 2214142 logstash-0-30f35c9d-0b38-43b8-a086-bd693cd43b04 /***** logstash-0
I really see document in topic when is execute --from-beginning --command-config config/consumer.properties
I try with logstash 7.6, 7.8, 7.8.1 and I'm in 7.9 now.
Kafka is in version 2.12-2.3.0
Filebeat in version 7.9 now (try 7.6, 7.8 and 7.9)
Elasticsearch in 7.9 now (try 7.6, 7.8 and 7.9)
Thanks for support