Logstash shuts down after setting offset for kafka partition

this is logstash.conf

input {
  kafka {
    bootstrap_servers => "kafka-app-logs:9092"
    topics => ["pm2-logs"]
    codec => multiline {
      pattern => "^\s"
      what => "previous"
    }
  }
}

filter {
  mutate {
    gsub => [ "message", "\n", " " ]
  }
}

output {
    elasticsearch {
     hosts => "app-logs.local:9200"
     index => "pm2-logs"
   }
}

this are the logs:

Pipeline started {"pipeline.id"=>"main"}
Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = [kafka-app-logs.local:9092]
	check.crcs = true
	client.dns.lookup = default
	client.id = logstash-0
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = logstash
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
Kafka version: 2.3.0
Kafka commitId: fc1aaa116b661c8a
Kafka startTimeMs: 1592213391234
 Subscribed to topic(s): pm2-logs
 Successfully started Logstash API endpoint {:port=>9600}
 Cluster ID: RVTI2y7EStiP-FCfnWxyDg
Discovered group coordinator 10.122.25.33:9092 (id: 2147482645 rack: null)
Revoking previously assigned partitions []
(Re-)joining group
Successfully joined group with generation 1958
Setting newly assigned partitions: pm2-logs-3, pm2-logs-4, pm2-logs-5
Setting offset for partition pm2-logs-4 to the committed offset FetchPosition{offset=5854066771, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=10.122.41.58:9092 (id: 1003 rack: null), epoch=-1}}
Setting offset for partition pm2-logs-5 to the committed offset FetchPosition{offset=5834136277, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=10.122.21.204:9092 (id: 1005 rack: null), epoch=-1}}
Setting offset for partition pm2-logs-3 to the committed offset FetchPosition{offset=5834144120, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=10.122.4.64:9092 (id: 1002 rack: null), epoch=-1}}
Attempt to heartbeat failed since group is rebalancing
Revoking previously assigned partitions [pm2-logs-3, pm2-logs-4, pm2-logs-5]
(Re-)joining group
Successfully joined group with generation 1959
Setting newly assigned partitions: pm2-logs-4, pm2-logs-5
Setting offset for partition pm2-logs-4 to the committed offset FetchPosition{offset=5854984158, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=10.122.41.58:9092 (id: 1003 rack: null), epoch=-1}}
Setting offset for partition pm2-logs-5 to the committed offset FetchPosition{offset=5835053664, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=10.122.21.204:9092 (id: 1005 rack: null), epoch=-1}}
SIGTERM received. Shutting down.
Member logstash-0-e9db9bb6-c663-4f38-9a3e-d3ae1804b00c sending LeaveGroup request to coordinator 10.122.4.46:9092 (id: 2147482645 rack: null)
Pipeline terminated {"pipeline.id"=>"main"}
Logstash shut down.

can you please help me with what could be the issue?
i am using this config to get logs from kafka to elasticsearch with codec multiline plugin, and it is working with same config for another logstash server

Hi @Aliasgar_Choolawala - Welcome to our community forums!

Your team has reached out to Elastic Support and we will be working with you via the support case.

Thank you.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.