Understanding HA with Kafka

Hi, running 6.1.1.

I have 3 Kafka brokers (Confluent 3.3.1e, Kafka 0.11.0). I physically downed 2 of the brokers for 10 minutes, but logs kept getting indexed!

The topic has replica count 1.

Unless I don't understand something with Kafka, wouldn't ack -1 prevent Filebeat from writing to Kafka?

The Kafka min.insync.replicas is set to 1.

The config:

output.kafka:
  hosts: ["xxx.xxx.xxx.xxx:9092"]

  version: 0.11.0.0

  topic: "xxxxxx"
  partition.round_robin:
    reachable_only: true

  required_acks: -1
  max_message_bytes: 2000000

kafka splits a topic into N partitions. These partitions are subject to replication. When writing, a client selects the partition to write to. Each partition has one broker, currently being the leader. The leader accepts events from the client and replicates the messages if possible. That is each partition has a leader and N replica brokers. The replica brokers, you have a set of in-sync replica brokers and a set of out-of-sync brokers. Any in-sync replica broker can become the new leader for a partition.

The metricbeat partitions metricset collects information about brokers, partitions, leaders and in-sync brokers.

As long as the kafka client finds a leading broker willing to ACK events for a partition, the partition is reachable.

If a broker does not ACK events (cause N ACKs from replicas are required as well), the partition is not reachable.

You configured reachable_only: true. This forces the kafka client to distribute events only on reachable partitions. That is, as long as you have at least one reachable partition, beats can continue pushing events. Setting reachable_only to true is not recommended, as this can potentially send all events to one partition only, messing with load-balancing.

Setting reachable_only: false (the default), should block beats the moment one partition becomes unreachable.

So reachable_only: true overrides acks?

No. reachable_only: true does not overwrite ACKs. In kafka the broker replicates the messages to other brokers. That is, ACK handling must be enforced by the broker! If a leader becomes unresponsive, reachable_only will ignore the partitions, until the leader is available again or a new leader has been selected by kafka.

Let's have a look at the Kafka protocol guide. About RequiredACK it says that "If it is -1 the server will block until the message is committed by all in sync replicas before sending a response". A replica not being in sync or being not available at all, might not be required to ACK a message to the client.

Also have a look at the kafka broker configuration for min.insync.replicas.

Doing some search you will find a number of talks on kafka HA and configuration (for example).

Cool. Already mentioned that min.insync.replicas = 1 on my config.

Ah, missed that one. Keep in mind, the leader is considered an insync replica as well. If you set replica to 3, consider insync.replicas = 2.

Ok so now I'm testing...

  • min.insync.replicas: 2
  • Topic: 18 partitions, 3 replicas
  • unclean_leader_election_enable: false
  • Filebeat reachable_only: false
  • Logstash: All defaults, 3 instances with 6 consumer threads each.

What should I expect when I down a broker?

  • Will we miss documents?
  • Will the exact amount of documents be indexed?
  • Will more then the expect amount of documents be indexed (NOT checking for duplicates for now)?

I noticed point 3. Is that expected?

Will we miss documents?

Filebeat employs infinite retry. Internally it drops no events.

But as always, it depends. If the output is down, no events can be published and filebeat will be blocked. Once buffers are full, no new lines are read from you log files. If kafka/filebeat is in this non-processing state for too long and file-rotation kicks in, two things can happen (depending on configs): a) missing documents, because original files have been deleted b) filebeat keeps files open (files are only deleted for real when no more process accesses said file), eventually allowing the system to run out of disk space.

That is, your disk acts like a queue. It's up to you if you want to allow to drop events or potentially run out of disk space. Depending on sizing and log frequency, this can after days or within minuts.

Plus, If kafka becomes available again, filebeat might have quite a backlog of data to publish -> increased CPU, disk, network usage. If these are not increased, filebeat+kafka have already be running at it's limit.

Will the exact amount of documents be indexed?
Will more then the expect amount of documents be indexed (NOT checking for duplicates for now)?

Beats retry publishing events, until kafka did ACK the event being processed. If kafka did not ACK or we're facing network errors, beats have no idea wether events have been processed by kafka or not -> send same event again -> duplicates.

Thanks

Actually one final question.

In my application I write 2 log lines using log4j. I send request to my application for a couple of hours. I then count the total of each written.

The first line sometimes misses one or 2 lines, while the second line is all there... Is that because of log rotation? I have tail_files: false for filebeat.

Yeah it seems the first line is always missing a couple while the second line logged is there all the time...

This should not happen. If you test with console output instead of kaka, is the first part still missing?

Please share your complete filebeat config + filebeat debug logs.

I need to find a way to test this... Because I'm sending thousands of logs and then count. I will try to setup a unique topic just for that application and see if the numbers match at least in kafka and see if I can exclude that as the issue.

filebeat.prospectors:

  • input_type: log
    paths:

    • /var/lib/mesos/slave/slaves//frameworks//executors//runs/latest/stdout
    • /var/lib/mesos/slave/slaves//frameworks//executors//runs/latest/stderr
      fields:
      source_type: "framework"
      fields_under_root: true
      tail_files: false
  • input_type: log
    paths:

    • /var/log/mesos/*.log
    • /var/log/dcos/dcos.log
      fields:
      source_type: "dcos"
      fields_under_root: true
      tail_files: true

output.kafka:
hosts: ["broker.confluent-kafka-prod.l4lb.thisdcos.directory:9092"] <--- This a load balanced using mesos DNS.

version: 0.11.0.0

topic: "xxx"
partition.round_robin:
reachable_only: false

required_acks: -1
max_message_bytes: 2000000

Ok so I narrowed down the issue. But not sure what to make of it...

Some facts...

  • I'm using DC/OS 1.10.4
  • I have a single container deployed running simple http service that logs and replies 200 OK.
  • Application logs using logback to console which those get forwarded to DC/OS stdout file logs.
  • Logs rotation is setup at 100MB 10 files.
  • Filebeat is only configured to read that container's log.
  • Filebeat pushes only that containers logs to a "test" topic (18 partitions, 3 replicas).
  • 3 instances of logstash with 6 consumer threads.
  • Use jmeter to send requests to the application.
  • Send 4200000 HTTP requests to the application.

So...

  • In Kafka I have 4200010 records.
  • In Elastic I also have 4200010.
  • Out of the 4200010, 12 are records of log rotation. DC/OS logs 2 lines for this.
  • Logstash reports grok parse failures for those 12 records, because it doesn't match the app's pattern.
  • So missing 2 records from my application.

So I guess during a log rotation 2 records got swallowed?

Also posting this on DC/OS groups.

The use of tail_files: true in your second prospector looks suspicious. It's kind of an unsafe setting and personally I would never set this to true. Filebeat beeing a log collector already tails files. The tail_files setting ensures the read pointer is set to the end of the log file, when being opened. Worse, depending on the buffering/writing strategy by the log producer, filebeat might start reading right in the middle of a line, which makes the first event not parsable.

Only reason for people to use tail_files is suppress the collection of historical data. But this is better done with a 'new' registry file. So to enforce a clean/new registry file starting at the end of logs (do not publish already present logs), run filebeat once with tail_files and console output -> stop filebeat -> remove tail_files setting -> start filebeat. This way, filebeat will start processing all new contents.

Hi, thanks. The issue is with the first prospector which has tail_files: false.

DC/OS prints a state log when it rotates. I have printed millions of logs and only 2 go missing. So I know kafka has them all. I know Elastic got them all. Elastic reported no index rejections.

Kafka sais it has 4200010, elastic has 4200010.

Elastic query shows 12 DC/OS log rotations and my records are at 4,199,998. So somehow the log rotation is causing this? (To be clear DC/OS prints the log rotation inside the same stdout file, it's basically that containers stdout as a file)

Out of curiosity...

The filebeat pattern matches: /var/lib/mesos/slave/slaves/*/frameworks/*/executors/*/runs/latest/stdout*

I have the following files...

stdout
stdout.1
stdout.2
stdout...
stdout.9
stdout.logrotate.state <---- Changes everytime log rotation happens

Would this affect the prospector somehow? I'm assuming yes

I can confirm 100%

stdout.logrotate.state
Interferes with
/var/lib/mesos/slave/slaves/*/frameworks/*/executors/*/runs/latest/stdout*

So when ever DC/OS/Mesos touch the stdout.logrotate.state file it causes filebeat to miss 2 records.

Please format logs, configs and terminal input/output using the </>-Button or markdown code fences. This forum uses Markdown to format posts. I didn't see you using * at the end of your path :slight_smile:

This makes me wonder how exactly log-rotation works. Normally it's just some file being renamed (e.g. using mv).

You are also collecting stdout.logrotate.state? This file seems to be somewhat special and might be subject to truncation. Consider excluding this file from your prospector.

Filebeat can only ready what has been written. If contents gets deleted (e.g. file truncation), data loss is very likely. There is no real synchronisation between processes on log-files -> you will always have some kind of races. Log writers should append contents only. Logrotation should only rename files. Then you are somewhat safe to get all logs.

Can you tell which files you are missing messages from?

Oops, sorry I noticed and corrected the markdown on the second post. :slight_smile:

So the DC/OS docs indicate to collect files using the .../stdout* pattern.

This matches all files including the file stdout.logrotate.state which happens to also be in the same folder as the stdout files.

When DC/OS log rotation happens DC/OS touches this file and makes 2 line entry in it.

I'm no Linux file expert or aware how Filebeat treats files internally... But for what ever reason when that file gets touched exactly 2 records get missed. This happens on every rotation.

Simple solution was to use exclude_files.

I will let the DC/OS community know and see what comes of it.