on our production system we use the kafka import to export data to elasticsearch. we read the data from about 10 topics and insert them in indeces with the same name + date in elastic search.
We run logstash and elasticseach in kubernetes on aws.
The image i use is 7.1.1 where i added the plugin for the use of the confluent schemaregistry
it all runs quite good for a while but then without any error the proces just stops.
After a restart it runs again for a while and then the cyclus repeats.
What configuration can help me make it more stable:
This is the conf:
logstash:
appVersion: "7.1.1"
image:
repository: "registry.cts.appx.cloud/jenkins/cts-logstash-image"
tag: "20190619081148"
elasticsearch:
host: elasticsearch-client
port: 9200
replicaCount: 4
podDisruptionBudget:
maxUnavailable: 1
logstashJavaOpts: "-Xmx3g -Xms3g"
resources:
limits:
cpu: 5
memory: 6Gi
requests:
cpu: 500m
memory: 6Gi
## ref: https://github.com/elastic/logstash-docker/blob/master/build/logstash/env2yaml/env2yaml.go
config:
config.reload.automatic: "true"
path.config: /usr/share/logstash/pipeline
path.data: /usr/share/logstash/data
## ref: https://www.elastic.co/guide/en/logstash/current/persistent-queues.html
queue.checkpoint.writes: 1
queue.drain: "true"
queue.max_bytes: 15gb # disk capacity must be greater than the value of `queue.max_bytes`
queue.type: persisted
pipeline.workers: 10
pipeline.id: reporting
max.poll.records: 400
session.timeout.ms: 60000
request.timeout.ms: 80000
ls.java.opts: "-Xmx3g -Xms3g"
xpack.monitoring.elasticsearch.hosts: elasticsearch-client
persistence:
storageClass: gp2
accessMode: ReadWriteOnce
size: 20Gi
inputs:
main: |-
input {
kafka {
codec => avro_schema_registry { endpoint => "http://confluentschemaregistry" }
bootstrap_servers => "kafka-0.broker:9092,kafka-1.broker:9092,kafka-2.broker:9092"
group_id => "logstash-reporting"
consumer_threads => 10
topics => [ "received-meteringrequests", "pushed-meteringrequests", "collected-meteringrequests", "completed-meteringrequests", "delivered-ui-batchrequest-meteringrequests" ]
decorate_events => true
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
}
filters:
main: |-
filter {
mutate {
id => "REPORTING_CORRELATIONID"
add_field => { "[@metadata][correlationId]" => "%{correlationId}" }
}
if [@metadata][kafka][topic] == "received-meteringrequests" {
mutate {
id => "REPORT_RECEIVED"
add_field => { "intakeDateTime" => "%{[batchRequestInfo][intakeDateTime]}" }
add_field => { "senderID" => "%{[batchRequestInfo][senderID]}" }
add_field => { "roleID" => "%{[batchRequestInfo][roleID]}" }
add_field => { "receivedTimestamp" => "%{[@metadata][kafka][timestamp]}" }
add_field => { "queryReason" => "%{[batchRequestElementInfo][queryReason]}" }
}
}
if [@metadata][kafka][topic] == "pushed-meteringrequests" {
mutate {
id => "REPORT_PUSHED"
add_field => { "pushedTimestamp" => "%{[@metadata][kafka][timestamp]}" }
}
}
if [@metadata][kafka][topic] == "collected-meteringrequests" {
mutate {
id => "REPORT_COLLECTED"
add_field => { "collectedTimestamp" => "%{[@metadata][kafka][timestamp]}" }
}
}
if [@metadata][kafka][topic] == "completed-meteringrequests" {
if [rejectionCode] {
mutate {
id => "REPORT_COMPLETED_REJECTION"
add_field => { "rejectionCode" => "%{[rejectionCode]}" }
add_field => { "rejectedTimestamp" => "%{[@metadata][kafka][timestamp]}" }
}
} else {
mutate {
id => "REPORT_COMPLETED_DATA"
add_field => { "completedTimestamp" => "%{[completedDateTime]}" }
}
}
}
if [@metadata][kafka][topic] == "delivered-ui-batchrequest-meteringrequests" {
mutate {
id => "REPORT_DELIVERED"
add_field => { "deliveredTimestamp" => "%{[@metadata][kafka][timestamp]}" }
}
}
if [@metadata][kafka][topic] == "correlated-id-ui-facilitatingrnb-completed-meteringrequests" {
mutate {
id => "FACILITATING_RNB_CORRELATION_ID"
add_field => { "facilitatingRNB" => "%{[facilitatingRNB]}" }
}
}
if [@metadata][kafka][topic] == "routed-ui-facilitating-rnb-completed-meteringrequest" {
mutate {
id => "FACILITATING_RNB"
add_field => { "facilitatingRNB" => "%{[facilitatingRNB]}" }
}
}
mutate {
id => "REPORT_REMOVEFIELDS"
remove_field => [ "correlationId", "responseDateTime", "collectedMeterReading", "batchRequestInfo", "routedBatchRequestInfo", "responseTimeMillis", "deliveredBatchRequestInfo", "externalRnbRejectionCode", "batchDeliveredCorrelationId", "readingRequestDateTime", "elements", "batchRequestElementInfo", "externalRnbID", "energyMeterInfo", "rejectionCode", "[@timestamp]", "facilitatingRNB", "[@version]", "headEndPushInfo", "completedDateTime" ]
}
}
outputs:
main: |-
output {
elasticsearch {
hosts => ["http://elasticsearch-client:9200"]
manage_template => true
doc_as_upsert => true
action => "update"
index => "cts-reporting-portaal"
document_id => "%{[@metadata][correlationId]}"
}
}```
[details="Summary"]
This text will be hidden
[/details]