Out of delivery of messages in kafka

I want to process logs of one system(which has rolling policy). To process logs, logs must be given in order. So to achieve this, I have deployed latest version of filebeat(filebeat-7.5.2-linux-x86_64) and producing the messages of logs in kafka. But I am getting out of order messages in kafka. I have attached the configuration of filebeat and my kafka topic.

filebeat.yml

filebeat.inputs:
  - type: log
    paths:
      - /path/to/log/file
    multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}'
    multiline.negate: true
    multiline.match: after

output.kafka:
  codec.format:
    string: '%{[message]}'
  hosts: ["broker-1:9092","broker-2:9092","broker-3:9092"]
  topic: 'topic'
  required_acks: 1
  compression: gzip
  version: 0.8.2.1
  worker: 1

#logging.level: debug

configuration of kafka topic (kafka cluster has three broker as I have given in filebeat.yml)

PartitionCount:1	
ReplicationFactor:1	
Configs:retention.ms=86400000

Partition: 0	
Leader: 3	
Replicas: 3	
Isr: 3

Could you please share debug logs of Filebeat?

Some of the log. If you required more information, let me know. thanks.

       2020-02-21T17:23:26.051Z	DEBUG	[harvester]	log/log.go:107	End of file reached: /var/log/hive/hadoop-cmf-hive-HIVESERVER2.log.out; Backoff now.
2020-02-21T17:23:28.051Z	DEBUG	[multiline]	multiline/multiline.go:175	Multiline event flushed because timeout reached.
2020-02-21T17:23:28.051Z	DEBUG	[processors]	processing/processors.go:186	Publish event: {
  "@timestamp": "2020-02-21T17:23:23.051Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "7.5.2"
  },
  "message": "2020-02-21 17:23:22,975 INFO  org.apache.hive.service.cli.operation.OperationManager: [HiveServer2-Handler-Pool: Thread-17332]: Adding operation: OperationHandle [opType=EXECUTE_STATEMENT, getHandleIdentifier()=c3043717-2a1a-45af-bbae-0b06f467fb92]",
  "log": {
    "offset": 65565908,
    "file": {
      "path": "/var/log/hive/hadoop-cmf-hive-HIVESERVER2.log.out"
    }
  },
  "input": {
    "type": "log"
  },
  "ecs": {
    "version": "1.1.0"
  },
  "host": {
    "name": "ip-"
  },
  "agent": {
    "ephemeral_id": "bee9392d-b4ef-490d-82d7-f1e329313ec5",
    "hostname": "ip-",
    "id": "c719490a-a7fe-434f-b706-fb73b5091ef8",
    "version": "7.5.2",
    "type": "filebeat"
  }
}
2020-02-21T17:23:29.052Z	DEBUG	[kafka]	kafka/client.go:196	setting event.Meta["topic"] = hive_server2_log
2020-02-21T17:23:29.052Z	DEBUG	[kafka]	kafka/partition.go:132	setting event.Meta["partition"] = 0
2020-02-21T17:23:29.053Z	DEBUG	[kafka]	kafka/client.go:278	finished kafka batch
2020-02-21T17:23:29.053Z	DEBUG	[publisher]	memqueue/ackloop.go:160	ackloop: receive ack [31: 0, 1]
2020-02-21T17:23:29.053Z	DEBUG	[publisher]	memqueue/eventloop.go:535	broker ACK events: count=1, start-seq=5144, end-seq=5144

2020-02-21T17:23:29.053Z	DEBUG	[publisher]	memqueue/ackloop.go:128	ackloop: return ack to broker loop:1
2020-02-21T17:23:29.053Z	DEBUG	[publisher]	memqueue/ackloop.go:131	ackloop:  done send ack
2020-02-21T17:23:29.053Z	DEBUG	[acker]	beater/acker.go:64	stateful ack	{"count": 1}
2020-02-21T17:23:29.053Z	DEBUG	[registrar]	registrar/registrar.go:356	Processing 1 events
2020-02-21T17:23:29.053Z	DEBUG	[registrar]	registrar/registrar.go:326	Registrar state updates processed. Count: 1
2020-02-21T17:23:29.053Z	DEBUG	[registrar]	registrar/registrar.go:411	Write registry file: /home/ubuntu/hive-log-parser/filebeat-7.5.2-linux-x86_64/data/registry/filebeat/data.json (1)
2020-02-21T17:23:29.058Z	DEBUG	[registrar]	registrar/registrar.go:404	Registry file updated. 1 states written.
2020-02-21T17:23:29.604Z	DEBUG	[input]	input/input.go:152	Run input
2020-02-21T17:23:29.604Z	DEBUG	[input]	log/input.go:191	Start next scan
2020-02-21T17:23:29.604Z	DEBUG	[input]	log/input.go:421	Check file for harvesting: /var/log/hive/hadoop-cmf-hive-HIVESERVER2.log.out
2020-02-21T17:23:29.604Z	DEBUG	[input]	log/input.go:511	Update existing file for harvesting: /var/log/hive/hadoop-cmf-hive-HIVESERVER2.log.out, offset: 65566157
2020-02-21T17:23:29.604Z	DEBUG	[input]	log/input.go:563	Harvester for file is still running: /var/log/hive/hadoop-cmf-hive-HIVESERVER2.log.out
2020-02-21T17:23:29.604Z	DEBUG	[input]	log/input.go:212	input states cleaned up. Before: 1, After: 1, Pending: 0
2020-02-21T17:23:30.052Z	DEBUG	[harvester]	log/log.go:107	End of file reached: /var/log/hive/hadoop-cmf-hive-HIVESERVER2.log.out; Backoff now.
2020-02-21T17:23:38.052Z	DEBUG	[harvester]	log/log.go:107	End of file reached: /var/log/hive/hadoop-cmf-hive-HIVESERVER2.log.out; Backoff now.
2020-02-21T17:23:38.052Z	DEBUG	[processors]	processing/processors.go:186	Publish event: {
  "@timestamp": "2020-02-21T17:23:38.052Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "7.5.2"
  },
  "host": {
    "name": "ip-"
  },
  "agent": {
    "type": "filebeat",
    "ephemeral_id": "bee9392d-b4ef-490d-82d7-f1e329313ec5",
    "hostname": "ip-",
    "id": "c719490a-a7fe-434f-b706-fb73b5091ef8",
    "version": "7.5.2"
  },
  "ecs": {
    "version": "1.1.0"
  },
  "log": {
    "offset": 65566157,
    "file": {
      "path": "/var/log/hive/hadoop-cmf-hive-HIVESERVER2.log.out"
    }
  },
  "message": "2020-02-21 17:23:36,773 INFO  org.apache.hadoop.hive.ql.exec.Task: [HiveServer2-Background-Pool: Thread-273032]: 2020-02-21 17:23:36,773 Stage-1 map = 100%,  reduce = 94%, Cumulative CPU 6531.63 sec",
  "input": {
    "type": "log"
  }
}
2020-02-21T17:23:39.052Z	DEBUG	[harvester]	log/log.go:107	End of file reached: /var/log/hive/hadoop-cmf-hive-HIVESERVER2.log.out; Backoff now.
2020-02-21T17:23:39.053Z	DEBUG	[kafka]	kafka/client.go:196	setting event.Meta["topic"] = hive_server2_log
2020-02-21T17:23:39.053Z	DEBUG	[kafka]	kafka/partition.go:132	setting event.Meta["partition"] = 0
2020-02-21T17:23:39.064Z	DEBUG	[kafka]	kafka/client.go:278	finished kafka batch
2020-02-21T17:23:39.064Z	DEBUG	[publisher]	memqueue/ackloop.go:160	ackloop: receive ack [32: 0, 1]
2020-02-21T17:23:39.064Z	DEBUG	[publisher]	memqueue/eventloop.go:535	broker ACK events: count=1, start-seq=5145, end-seq=5145

2020-02-21T17:23:39.064Z	DEBUG	[publisher]	memqueue/ackloop.go:128	ackloop: return ack to broker loop:1
2020-02-21T17:23:39.064Z	DEBUG	[publisher]	memqueue/ackloop.go:131	ackloop:  done send ack
2020-02-21T17:23:39.064Z	DEBUG	[acker]	beater/acker.go:64	stateful ack	{"count": 1}
2020-02-21T17:23:39.064Z	DEBUG	[registrar]	registrar/registrar.go:356	Processing 1 events
2020-02-21T17:23:39.064Z	DEBUG	[registrar]	registrar/registrar.go:326	Registrar state updates processed. Count: 1
2020-02-21T17:23:39.064Z	DEBUG	[registrar]	registrar/registrar.go:411	Write registry file: /home/ubuntu/hive-log-parser/filebeat-7.5.2-linux-x86_64/data/registry/filebeat/data.json (1)
2020-02-21T17:23:39.069Z	DEBUG	[registrar]	registrar/registrar.go:404	Registry file updated. 1 states written.
2020-02-21T17:23:39.596Z	INFO	[monitoring]	log/log.go:145	Non-zero metrics in the last 30s	{"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":270,"time":{"ms":8}},"total":{"ticks":2020,"time":{"ms":25},"value":2020},"user":{"ticks":1750,"time":{"ms":17}}},"handles":{"limit":{"hard":4096,"soft":1024},"open":9},"info":{"ephemeral_id":"bee9392d-b4ef-490d-82d7-f1e329313ec5","uptime":{"ms":210043}},"memstats":{"gc_next":10710528,"memory_alloc":8956008,"memory_total":208085400},"runtime":{"goroutines":37}},"filebeat":{"events":{"added":3,"done":3},"harvester":{"open_files":1,"running":1}},"libbeat":{"config":{"module":{"running":0}},"output":{"events":{"acked":3,"batches":3,"total":3}},"outputs":{"kafka":{"bytes_read":144,"bytes_write":916}},"pipeline":{"clients":1,"events":{"active":0,"published":3,"total":3},"queue":{"acked":3}}},"registrar":{"states":{"current":1,"update":3},"writes":{"success":3,"total":3}},"system":{"load":{"1":2.38,"15":2.27,"5":2.28,"norm":{"1":0.0595,"15":0.0568,"5":0.057}}}}}}
2020-02-21T17:23:39.604Z	DEBUG	[input]	input/input.go:152	Run input
2020-02-21T17:23:39.604Z	DEBUG	[input]	log/input.go:191	Start next scan
2020-02-21T17:23:39.604Z	DEBUG	[input]	log/input.go:421	Check file for harvesting: /var/log/hive/hadoop-cmf-hive-HIVESERVER2.log.out
2020-02-21T17:23:39.605Z	DEBUG	[input]	log/input.go:511	Update existing file for harvesting: /var/log/hive/hadoop-cmf-hive-HIVESERVER2.log.out, offset: 65566356
2020-02-21T17:23:39.605Z	DEBUG	[input]	log/input.go:563	Harvester for file is still running: /var/log/hive/hadoop-cmf-hive-HIVESERVER2.log.out
2020-02-21T17:23:39.605Z	DEBUG	[input]	log/input.go:212	input states cleaned up. Before: 1, After: 1, Pending: 0
2020-02-21T17:23:41.053Z	DEBUG	[harvester]	log/log.go:107	End of file reached: /var/log/hive/hadoop-cmf-hive-HIVESERVER2.log.out; Backoff now.
2020-02-21T17:23:43.052Z	DEBUG	[multiline]	multiline/multiline.go:175	Multiline event flushed because timeout reached.
2020-02-21T17:23:43.053Z	DEBUG	[processors]	processing/processors.go:186	Publish event: {
  "@timestamp": "2020-02-21T17:23:38.052Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "7.5.2"
  },
  "input": {
    "type": "log"
  },
  "ecs": {
    "version": "1.1.0"
  },
  "host": {
    "name": "ip-"
  },
  "agent": {
    "id": "c719490a-a7fe-434f-b706-fb73b5091ef8",
    "version": "7.5.2",
    "type": "filebeat",
    "ephemeral_id": "bee9392d-b4ef-490d-82d7-f1e329313ec5",
    "hostname": "ip-"
  },
  "message": "2020-02-21 17:23:37,809 INFO  org.apache.hadoop.hive.ql.exec.Task: [HiveServer2-Background-Pool: Thread-269979]: 2020-02-21 17:23:37,809 Stage-1 map = 99%,  reduce = 99%, Cumulative CPU 262388.23 sec",
  "log": {
    "offset": 65566356,
    "file": {
      "path": "/var/log/hive/hadoop-cmf-hive-HIVESERVER2.log.out"
    }
  }
}
2020-02-21T17:23:44.053Z	DEBUG	[kafka]	kafka/client.go:196	setting event.Meta["topic"] = hive_server2_log
2020-02-21T17:23:44.053Z	DEBUG	[kafka]	kafka/partition.go:132	setting event.Meta["partition"] = 0
2020-02-21T17:23:44.055Z	DEBUG	[kafka]	kafka/client.go:278	finished kafka batch
2020-02-21T17:23:44.055Z	DEBUG	[publisher]	memqueue/ackloop.go:160	ackloop: receive ack [33: 0, 1]
2020-02-21T17:23:44.055Z	DEBUG	[publisher]	memqueue/eventloop.go:535	broker ACK events: count=1, start-seq=5146, end-seq=5146

2020-02-21T17:23:44.055Z	DEBUG	[publisher]	memqueue/ackloop.go:128	ackloop: return ack to broker loop:1
2020-02-21T17:23:44.055Z	DEBUG	[publisher]	memqueue/ackloop.go:131	ackloop:  done send ack
2020-02-21T17:23:44.055Z	DEBUG	[acker]	beater/acker.go:64	stateful ack	{"count": 1}
2020-02-21T17:23:44.056Z	DEBUG	[registrar]	registrar/registrar.go:356	Processing 1 events
2020-02-21T17:23:44.056Z	DEBUG	[registrar]	registrar/registrar.go:326	Registrar state updates processed. Count: 1
2020-02-21T17:23:44.056Z	DEBUG	[registrar]	registrar/registrar.go:411	Write registry file: /home/ubuntu/hive-log-parser/filebeat-7.5.2-linux-x86_64/data/registry/filebeat/data.json (1)
2020-02-21T17:23:44.061Z	DEBUG	[registrar]	registrar/registrar.go:404	Registry file updated. 1 states written.
2020-02-21T17:23:45.053Z	DEBUG	[harvester]	log/log.go:107	End of file reached: /var/log/hive/hadoop-cmf-hive-HIVESERVER2.log.out; Backoff now.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.