Logstash elasticsearch to kafka stops after being started


(Aurélien) #1

Dear all,

I'am trying to export documents from an Elasticsearch index to a Kafka topic using Logstash. However, when I start Logstash with only the elk-to-kafka configuration file it starts and quickly stops right after. Then systemd restarts logstash again and again.

If I use another Logstash configuration for example to consume data from a topic and sending it to an Elasticsearch index it works.

If I use both configuration files in a multiple pipelines, the configuration to export documents from elasticsearch to kafka does nothing but the configuration to consume data from Kafka and to export them to elasticsearch works and logstash does not stop working. Systemd does not reload Logstash.

I'am using Debian based packages for Elasticsearch, Zookeeper and Logstash installed on Elementary OS Loki which is based on Ubuntu 16.04.

Logstash 6.1.0, Elasticsearch 6.1.0, Zookeeper 3.4.8-1--1, Kafka 2.12-1.0.0.

Thanks for your help.

Here my Logstash configuration to export an index to a kafka topic:

input {
elasticsearch {
hosts => "localhost"
index => "poc-local-input"
query => '{ "query": { "match_all": {} } }'
hosts => "localhost:9200"
user => elastic
password => elastic
}
}

output {
kafka {
client_id => "poc-local-elk-output-0"
codec => plain
topic_id => "poc-local-output"
}
}

The Logstash configuration to consume a topic and then send data to an index :

input {
kafka {
topics => 'poc-local-input'
client_id => "poc-local-input-0"
group_id => "poc-local-input"
}
}
filter {
dissect {
mapping => { "message" => "%{org_date_str} %{field1} %{field2} %{field3} %{field4} %{lat} %{lon}" }
}
date {
match => [ "org_date_str", "UNIX_MS" ]
target => "org_date"
}

mutate {
rename => {
"lon" => "[coordinate][lon]"
"lat" => "[coordinate][lat]"
}
convert => {
"lon" => "float"
"lat" => "float"
"input_latency" => "float"
}

remove_field => [
"org_date_str"
]
}
ruby {
init => "require 'time'"
code => "
latency = event.get('@timestamp') - event.get('org_date')
event.set('input_latency', latency)
"
add_tag => [ "calculated_input_latency" ]
}
}
output {
elasticsearch {
index => "poc-local-input"
hosts => "localhost:9200"
user => elastic
password => elastic
}
}

My pipelines.yml :

  • pipeline.id: poc-local-elk-output
    path.config: "/logstash-conf/poc-local-elk-output.conf"

I modified the /etc/systemd/system/logstash.service file and removed the "--path.settings" "/etc/logstash" and commented "path.config" line in /etc/logstash/logstash.yml so as to make Logstash systemd service working with multiple pipelines.

My init script :

#!/usr/bin/env bash
kafka_version="2.12-1.0.0"
kafka_dir="/opt/kafka/kafka_$kafka_version/"
kafka_bin_dir="$kafka_dir/bin"
kafka_topics="$kafka_bin_dir/kafka-topics.sh"

$kafka_topics --zookeeper localhost:2181 --delete --topic poc-local-input
$kafka_topics --zookeeper localhost:2181 --delete --topic poc-local-output

sleep 3

$kafka_topics --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic poc-local-input
$kafka_topics --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic poc-local-output

curl -u elastic:elastic -XDELETE localhost:9200/poc-local-input
curl -u elastic:elastic -XDELETE localhost:9200/poc-local-output

curl -u elastic:elastic -XPUT localhost:9200/poc-local-input
curl -u elastic:elastic -XPUT localhost:9200/poc-local-output

curl -u elastic:elastic -XPUT localhost:9200/poc-local-input/_mapping/doc -H 'Content-Type: application/json' -d '
{
"dynamic": "false",
"properties": {
"@timestamp": {
"type": "date"
},
"org_date": {
"type": "date"
},
"lat": {
"type": "float"
},
"lon": {
"type": "float"
},
"coordinate": {
"type": "geo_point"
},
"input_latency": {
"type": "float"
}
}
}'

curl -u elastic:elastic -XPUT localhost:9200/poc-local-output/_mapping/log -H 'Content-Type: application/json' -d '
{
"dynamic": "false",
"properties": {
"@timestamp": {
"type": "date"
},
"org_date": {
"type": "date"
},
"latency": {
"type": "float"
}
}
}'

An exemple of a string produced to the kafka topic to be injected in Elasticsearch :

1513696417052 In35 6saQ i37Z yqA2 -79.0533 -74.1763

An exemple of the corresponding string which should be produced by the Logstash configuration and send to a kafka topic :

2017-12-19T15:21:14.560Z %{host} 1513696417052 In35 6saQ i37Z yqA2 -79.0533 -74.1763

When there are already data in the Elasticsearch index and I restart Logstash, the elk-to-kafka configuration sends the existing entries to the topic before to stop.

Also, notice the %{host} which should be replaced by the current host ip according to the logstash documentation but is not.
From https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html#plugins-outputs-kafka-common-options :

The default codec is plain. Logstash will encode your events with not only the message field but also with a timestamp and hostname.


(Aurélien) #2

Here the /var/log/syslog :

Dec 19 16:56:21 aurelien systemd[1]: Started logstash.
Dec 19 16:56:30 aurelien logstash[20755]: Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties
Dec 19 16:56:31 aurelien logstash[20755]: SLF4J: Class path contains multiple SLF4J bindings.
Dec 19 16:56:31 aurelien logstash[20755]: SLF4J: Found binding in [jar:file:/usr/share/logstash/logstash-core/lib/org/apache/logging/log4j/log4j-slf4j-impl/2.6.2/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
Dec 19 16:56:31 aurelien logstash[20755]: SLF4J: Found binding in [jar:file:/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.4/vendor/jar-dependencies/runtime-jars/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
Dec 19 16:56:31 aurelien logstash[20755]: SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
Dec 19 16:56:31 aurelien logstash[20755]: SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]


(Aurélien) #3

An extract of the logstash.log as text is limited and I don't know how to share the whole log :

[2017-12-19T16:56:30,473][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.1.0"}
[2017-12-19T16:56:30,560][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2017-12-19T16:56:31,036][INFO ][org.apache.kafka.clients.producer.ProducerConfig] ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id = poc-local-elk-output-0
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 10
reconnect.backoff.ms = 10
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer

[2017-12-19T16:56:31,096][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 0.11.0.0
[2017-12-19T16:56:31,097][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : cb8625948210849f
[2017-12-19T16:56:31,102][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"poc-local-elk-output", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000, :thread=>"#<Thread:0x3da87f81@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:245 run>"}
[2017-12-19T16:56:31,164][INFO ][logstash.pipeline ] Pipeline started {"pipeline.id"=>"poc-local-elk-output"}
[2017-12-19T16:56:31,503][INFO ][org.apache.kafka.clients.producer.KafkaProducer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.


(system) #4

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.