Max_poll_records doesn't works on startup

Parameter max_poll_records of kafka plagin doesn't works on service startup and message sequence doesn't save.
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

Logstash config:

input {
kafka {
bootstrap_servers => "0.dual.kafka.qa-env.com:9092,1.dual.kafka.qa-env.com:9092,2.dual.kafka.qa-env.com:9092"
topics => ["mt--tmpTest"]
auto_offset_reset => latest
client_id => "logstash_c1"
group_id => "logstash_p1"
max_poll_records => "1"
consumer_threads => 1
}
}
filter {
json {
source => "message"
}
sleep {
time => "1"
}
}
output {
file {
path => "messages.log"
}
http {
http_method => "post"
url => "http://sharedservices.qa-env.com:9977/SendMessage?number=%{[number]}"
headers => { "Content-type" => "application/json; charset=utf-8" }
format => "json"
connect_timeout => 60
retry_failed => false
pool_max_per_route => 1
}
}

pipeline.workers=1

Steps for reproduce:

  1. Stop logstash
  2. Send 6 messages to kafka topic:

{"number": 1}
{"number": 2}
{"number": 3}
{"number": 4}
{"number": 5}
{"number": 6}

  1. Start logstash

The expected result:

  1. messages.log has messages:

{"number": 1}
{"number": 2}
{"number": 3}
{"number": 4}
{"number": 5}
{"number": 6}

  1. Logstash send requests

http://sharedservices.qa-env.com:9977/SendMessage?number=1
http://sharedservices.qa-env.com:9977/SendMessage?number=2
http://sharedservices.qa-env.com:9977/SendMessage?number=3
http://sharedservices.qa-env.com:9977/SendMessage?number=4
http://sharedservices.qa-env.com:9977/SendMessage?number=5
http://sharedservices.qa-env.com:9977/SendMessage?number=6

  1. Logstash sends message with interval 1 sec between each message

The result:

  1. messages.log has messages:
{"message":"{\"number\": 1}","number":1,"@timestamp":"2019-11-22T10:47:28.193Z","@version":"1"}
{"message":"{\"number\": 2}","number":2,"@timestamp":"2019-11-22T10:47:28.209Z","@version":"1"}
{"message":"{\"number\": 3}","number":3,"@timestamp":"2019-11-22T10:47:28.209Z","@version":"1"}
{"message":"{\"number\": 4}","number":4,"@timestamp":"2019-11-22T10:47:28.209Z","@version":"1"}
{"message":"{\"number\": 5}","number":5,"@timestamp":"2019-11-22T10:47:28.209Z","@version":"1"}
{"message":"{\"number\": 6}","number":6,"@timestamp":"2019-11-22T10:47:28.209Z","@version":"1"}
  • OK
  1. Message sequence doesn't saved

172.24.18.208 - - [22/Nov/2019:10:47:35 +0000] "POST /SendMessage?number=6 HTTP/1.1" 200 0 "-" "Manticore 0.6.4"
172.24.18.208 - - [22/Nov/2019:10:47:35 +0000] "POST /SendMessage?number=4 HTTP/1.1" 200 0 "-" "Manticore 0.6.4"
172.24.18.208 - - [22/Nov/2019:10:47:35 +0000] "POST /SendMessage?number=2 HTTP/1.1" 200 0 "-" "Manticore 0.6.4"
172.24.18.208 - - [22/Nov/2019:10:47:35 +0000] "POST /SendMessage?number=5 HTTP/1.1" 200 0 "-" "Manticore 0.6.4"
172.24.18.208 - - [22/Nov/2019:10:47:35 +0000] "POST /SendMessage?number=3 HTTP/1.1" 200 0 "-" "Manticore 0.6.4"
172.24.18.208 - - [22/Nov/2019:10:47:35 +0000] "POST /SendMessage?number=1 HTTP/1.1" 200 0 "-" "Manticore 0.6.4"

  • Fail
  1. Web server received all messages in the same time 22/Nov/2019:10:47:35
  • Fail

logstash logs:

[2019-11-22T10:47:16,287][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2019-11-22T10:47:16,303][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.5.1"}
[2019-11-22T10:47:22,865][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2019-11-22T10:47:23,115][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x21d2e5a3 run>"}
[2019-11-22T10:47:23,334][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [0.dual.kafka.qa-env.com:9092, 1.dual.kafka.qa-env.com:9092, 2.dual.kafka.qa-env.com:9092]
check.crcs = true
client.id = logstash_c1-0
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = logstash_p1
heartbeat.interval.ms = 3000
interceptor.classes =
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1
metadata.max.age.ms = 300000
metric.reporters =
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

[2019-11-22T10:47:23,381][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>}
[2019-11-22T10:47:23,740][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 2.0.0
[2019-11-22T10:47:23,943][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : 3402a8361b734732
[2019-11-22T10:47:24,115][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2019-11-22T10:47:24,506][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: HRGK8wjiRgO-omuVBHtSNA
[2019-11-22T10:47:24,506][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash_c1-0, groupId=logstash_p1] Discovered group coordinator 0.dual.kafka.qa-env.com:9092 (id: 2147483646 rack: null)
[2019-11-22T10:47:24,506][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash_c1-0, groupId=logstash_p1] Revoking previously assigned partitions
[2019-11-22T10:47:24,506][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash_c1-0, groupId=logstash_p1] (Re-)joining group
[2019-11-22T10:47:27,865][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash_c1-0, groupId=logstash_p1] Successfully joined group with generation 3
[2019-11-22T10:47:27,865][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash_c1-0, groupId=logstash_p1] Setting newly assigned partitions [mt--tmpTest-0]
[2019-11-22T10:47:34,647][INFO ][logstash.outputs.file ] Opening file {:path=>"C:/Programs/logstash/bin/messages.log"}

Why do you expect a one second delay between posts to the http output?

The logstash pipeline does not preserve the ordering of events.

See logstash config:
max_poll_records => "1" + sleep {time => "1"}

The logstash pipeline does not preserve the ordering of events.

Why not? I have one pipeline worker and one consumer_threads

If pipeline.java_execution is enabled (and it is on by default in v7) then events are re-ordered even with "--pipeline.workers 1"

I do not understand why you think introducing a sleep of one second will produce an interval of one second between events. If a batch of events are flushed into the pipeline they will indeed be delayed by a second, but if they arrive at the same time they will exit at the same time.

If pipeline.java_execution is enabled (and it is on by default in v7) then events are re-ordered even with "--pipeline.workers 1"

Thank you. I started losgatsh with --pipeline.workers 1 --java-execution false, but it doesn't help me. Message sequence did't saved in the same scenario

I do not understand why you think introducing a sleep of one second will produce an interval of one second between events. If a batch of events are flushed into the pipeline they will indeed be delayed by a second, but if they arrive at the same time they will exit at the same time.

I am waiting only one message from kafka for one poll request (max_poll_records=>1). As I understand we have some time between poll request, so all messages can't to have the same time. sleep.time just additional time, maybe it's not correct.