Kafka consumers seem to stop working after a period

on our production system we use the kafka import to export data to elasticsearch. we read the data from about 10 topics and insert them in indeces with the same name + date in elastic search.
We run logstash and elasticseach in kubernetes on aws.

The image i use is 7.1.1 where i added the plugin for the use of the confluent schemaregistry

it all runs quite good for a while but then without any error the proces just stops.

2019-06-27_10-45-03

After a restart it runs again for a while and then the cyclus repeats.

What configuration can help me make it more stable:

This is the conf:

logstash:
  appVersion: "7.1.1"
  image:
    repository: "registry.cts.appx.cloud/jenkins/cts-logstash-image"
    tag: "20190619081148"
  elasticsearch:
    host: elasticsearch-client
    port: 9200

  replicaCount: 4

  podDisruptionBudget:
    maxUnavailable: 1

  logstashJavaOpts: "-Xmx3g -Xms3g"
  
  resources:
    limits:
      cpu: 5
      memory: 6Gi
    requests:
      cpu: 500m
      memory: 6Gi

## ref: https://github.com/elastic/logstash-docker/blob/master/build/logstash/env2yaml/env2yaml.go
  config:
    config.reload.automatic: "true"
    path.config: /usr/share/logstash/pipeline
    path.data: /usr/share/logstash/data

    ## ref: https://www.elastic.co/guide/en/logstash/current/persistent-queues.html
    queue.checkpoint.writes: 1
    queue.drain: "true"
    queue.max_bytes: 15gb  # disk capacity must be greater than the value of `queue.max_bytes`
    queue.type: persisted
    pipeline.workers: 10
    pipeline.id: reporting
    max.poll.records: 400
    session.timeout.ms: 60000
    request.timeout.ms: 80000
    ls.java.opts: "-Xmx3g -Xms3g"
    xpack.monitoring.elasticsearch.hosts: elasticsearch-client
    
  persistence:
    storageClass: gp2
    accessMode: ReadWriteOnce
    size: 20Gi

  inputs:
    main: |-
      input {
        kafka {
          codec => avro_schema_registry { endpoint => "http://confluentschemaregistry" }
          bootstrap_servers => "kafka-0.broker:9092,kafka-1.broker:9092,kafka-2.broker:9092"
          group_id => "logstash-reporting"
          consumer_threads => 10
          topics => [ "received-meteringrequests", "pushed-meteringrequests", "collected-meteringrequests", "completed-meteringrequests", "delivered-ui-batchrequest-meteringrequests" ]
          decorate_events => true
          value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
        }
      }

  filters:
    main: |-
      filter {
        mutate {
          id => "REPORTING_CORRELATIONID"
          add_field => { "[@metadata][correlationId]" => "%{correlationId}" }
        }
        if [@metadata][kafka][topic] == "received-meteringrequests" {
          mutate {
            id => "REPORT_RECEIVED"
            add_field => { "intakeDateTime" => "%{[batchRequestInfo][intakeDateTime]}" }
            add_field => { "senderID" => "%{[batchRequestInfo][senderID]}" }
            add_field => { "roleID" => "%{[batchRequestInfo][roleID]}" }
            add_field => { "receivedTimestamp" => "%{[@metadata][kafka][timestamp]}" }
            add_field => { "queryReason" => "%{[batchRequestElementInfo][queryReason]}" }
          }
        }
        if [@metadata][kafka][topic] == "pushed-meteringrequests" {
          mutate {
            id => "REPORT_PUSHED"
            add_field => { "pushedTimestamp" => "%{[@metadata][kafka][timestamp]}" }
          }
        }
        if [@metadata][kafka][topic] == "collected-meteringrequests" {
          mutate {
            id => "REPORT_COLLECTED"
            add_field => { "collectedTimestamp" => "%{[@metadata][kafka][timestamp]}" }
          }
        }
        if [@metadata][kafka][topic] == "completed-meteringrequests" {
          if [rejectionCode] {
            mutate {
              id => "REPORT_COMPLETED_REJECTION"
              add_field => { "rejectionCode" => "%{[rejectionCode]}" }
              add_field => { "rejectedTimestamp" => "%{[@metadata][kafka][timestamp]}" }
            }
          } else {
            mutate {
              id => "REPORT_COMPLETED_DATA"
              add_field => { "completedTimestamp" => "%{[completedDateTime]}" }
            }
          }
        }
        if [@metadata][kafka][topic] == "delivered-ui-batchrequest-meteringrequests" {
          mutate {
            id => "REPORT_DELIVERED"
            add_field => { "deliveredTimestamp" => "%{[@metadata][kafka][timestamp]}" }
          }
        }
        if [@metadata][kafka][topic] == "correlated-id-ui-facilitatingrnb-completed-meteringrequests" {
          mutate {
            id => "FACILITATING_RNB_CORRELATION_ID"
            add_field => { "facilitatingRNB" => "%{[facilitatingRNB]}" }
          }
        }
        if [@metadata][kafka][topic] == "routed-ui-facilitating-rnb-completed-meteringrequest" {
          mutate {
            id => "FACILITATING_RNB"
            add_field => { "facilitatingRNB" => "%{[facilitatingRNB]}" }
          }
        }


        mutate {
          id => "REPORT_REMOVEFIELDS"
          remove_field => [ "correlationId", "responseDateTime", "collectedMeterReading", "batchRequestInfo", "routedBatchRequestInfo", "responseTimeMillis", "deliveredBatchRequestInfo", "externalRnbRejectionCode", "batchDeliveredCorrelationId", "readingRequestDateTime", "elements", "batchRequestElementInfo", "externalRnbID", "energyMeterInfo", "rejectionCode", "[@timestamp]", "facilitatingRNB", "[@version]", "headEndPushInfo", "completedDateTime" ]
        }
      }
  outputs:
    main: |-
      output {
        elasticsearch {
          hosts => ["http://elasticsearch-client:9200"]
          manage_template => true
          doc_as_upsert => true
          action => "update"
          index => "cts-reporting-portaal"
          document_id => "%{[@metadata][correlationId]}"
        }
      }```
[details="Summary"]
This text will be hidden
[/details]

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.