Max_poll_records doesn't works on startup

Parameter max_poll_records of kafka plagin doesn't works on service startup and message sequence doesn't save.

Logstash config:

input {
kafka {
bootstrap_servers => "0.dual.kafka.qa-env.com:9092,1.dual.kafka.qa-env.com:9092,2.dual.kafka.qa-env.com:9092"
topics => ["mt--tmpTest"]
auto_offset_reset => latest
client_id => "logstash_c1"
group_id => "logstash_p1"
max_poll_records => "1"
consumer_threads => 1
}
}
filter {
json {
source => "message"
}
sleep {
time => "1"
}
}
output {
file {
path => "messages.log"
}
http {
http_method => "post"
url => "http://sharedservices.qa-env.com:9977/SendMessage?number=%{[number]}"
headers => { "Content-type" => "application/json; charset=utf-8" }
format => "json"
connect_timeout => 60
retry_failed => false
pool_max_per_route => 1
}
}

pipeline.workers=1

Steps for reproduce:

  1. Stop logstash
  2. Send 6 messages to kafka topic:

{"number": 1}
{"number": 2}
{"number": 3}
{"number": 4}
{"number": 5}
{"number": 6}

  1. Start logstash

The expected result:

  1. messages.log has messages:

{"number": 1}
{"number": 2}
{"number": 3}
{"number": 4}
{"number": 5}
{"number": 6}

  1. Logstash send requests

http://sharedservices.qa-env.com:9977/SendMessage?number=1
http://sharedservices.qa-env.com:9977/SendMessage?number=2
http://sharedservices.qa-env.com:9977/SendMessage?number=3
http://sharedservices.qa-env.com:9977/SendMessage?number=4
http://sharedservices.qa-env.com:9977/SendMessage?number=5
http://sharedservices.qa-env.com:9977/SendMessage?number=6

  1. Logstash sends message with interval 1 sec between each message

The result:

  1. messages.log has messages:
{"message":"{\"number\": 1}","number":1,"@timestamp":"2019-11-22T10:47:28.193Z","@version":"1"}
{"message":"{\"number\": 2}","number":2,"@timestamp":"2019-11-22T10:47:28.209Z","@version":"1"}
{"message":"{\"number\": 3}","number":3,"@timestamp":"2019-11-22T10:47:28.209Z","@version":"1"}
{"message":"{\"number\": 4}","number":4,"@timestamp":"2019-11-22T10:47:28.209Z","@version":"1"}
{"message":"{\"number\": 5}","number":5,"@timestamp":"2019-11-22T10:47:28.209Z","@version":"1"}
{"message":"{\"number\": 6}","number":6,"@timestamp":"2019-11-22T10:47:28.209Z","@version":"1"}
  • OK
  1. Message sequence doesn't saved

172.24.18.208 - - [22/Nov/2019:10:47:35 +0000] "POST /SendMessage?number=6 HTTP/1.1" 200 0 "-" "Manticore 0.6.4"
172.24.18.208 - - [22/Nov/2019:10:47:35 +0000] "POST /SendMessage?number=4 HTTP/1.1" 200 0 "-" "Manticore 0.6.4"
172.24.18.208 - - [22/Nov/2019:10:47:35 +0000] "POST /SendMessage?number=2 HTTP/1.1" 200 0 "-" "Manticore 0.6.4"
172.24.18.208 - - [22/Nov/2019:10:47:35 +0000] "POST /SendMessage?number=5 HTTP/1.1" 200 0 "-" "Manticore 0.6.4"
172.24.18.208 - - [22/Nov/2019:10:47:35 +0000] "POST /SendMessage?number=3 HTTP/1.1" 200 0 "-" "Manticore 0.6.4"
172.24.18.208 - - [22/Nov/2019:10:47:35 +0000] "POST /SendMessage?number=1 HTTP/1.1" 200 0 "-" "Manticore 0.6.4"

  • Fail
  1. Web server received all messages in the same time 22/Nov/2019:10:47:35
  • Fail

logstash logs:

[2019-11-22T10:47:16,287][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2019-11-22T10:47:16,303][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.5.1"}
[2019-11-22T10:47:22,865][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2019-11-22T10:47:23,115][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x21d2e5a3 run>"}
[2019-11-22T10:47:23,334][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [0.dual.kafka.qa-env.com:9092, 1.dual.kafka.qa-env.com:9092, 2.dual.kafka.qa-env.com:9092]
check.crcs = true
client.id = logstash_c1-0
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = logstash_p1
heartbeat.interval.ms = 3000
interceptor.classes =
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1
metadata.max.age.ms = 300000
metric.reporters =
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

[2019-11-22T10:47:23,381][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>}
[2019-11-22T10:47:23,740][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 2.0.0
[2019-11-22T10:47:23,943][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : 3402a8361b734732
[2019-11-22T10:47:24,115][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2019-11-22T10:47:24,506][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: HRGK8wjiRgO-omuVBHtSNA
[2019-11-22T10:47:24,506][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash_c1-0, groupId=logstash_p1] Discovered group coordinator 0.dual.kafka.qa-env.com:9092 (id: 2147483646 rack: null)
[2019-11-22T10:47:24,506][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash_c1-0, groupId=logstash_p1] Revoking previously assigned partitions
[2019-11-22T10:47:24,506][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash_c1-0, groupId=logstash_p1] (Re-)joining group
[2019-11-22T10:47:27,865][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash_c1-0, groupId=logstash_p1] Successfully joined group with generation 3
[2019-11-22T10:47:27,865][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash_c1-0, groupId=logstash_p1] Setting newly assigned partitions [mt--tmpTest-0]
[2019-11-22T10:47:34,647][INFO ][logstash.outputs.file ] Opening file {:path=>"C:/Programs/logstash/bin/messages.log"}

Why do you expect a one second delay between posts to the http output?

The logstash pipeline does not preserve the ordering of events.

See logstash config:
max_poll_records => "1" + sleep {time => "1"}

The logstash pipeline does not preserve the ordering of events.

Why not? I have one pipeline worker and one consumer_threads

If pipeline.java_execution is enabled (and it is on by default in v7) then events are re-ordered even with "--pipeline.workers 1"

I do not understand why you think introducing a sleep of one second will produce an interval of one second between events. If a batch of events are flushed into the pipeline they will indeed be delayed by a second, but if they arrive at the same time they will exit at the same time.

If pipeline.java_execution is enabled (and it is on by default in v7) then events are re-ordered even with "--pipeline.workers 1"

Thank you. I started losgatsh with --pipeline.workers 1 --java-execution false, but it doesn't help me. Message sequence did't saved in the same scenario

I do not understand why you think introducing a sleep of one second will produce an interval of one second between events. If a batch of events are flushed into the pipeline they will indeed be delayed by a second, but if they arrive at the same time they will exit at the same time.

I am waiting only one message from kafka for one poll request (max_poll_records=>1). As I understand we have some time between poll request, so all messages can't to have the same time. sleep.time just additional time, maybe it's not correct.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.