Dear all,
I'am trying to export documents from an Elasticsearch index to a Kafka topic using Logstash. However, when I start Logstash with only the elk-to-kafka configuration file it starts and quickly stops right after. Then systemd restarts logstash again and again.
If I use another Logstash configuration for example to consume data from a topic and sending it to an Elasticsearch index it works.
If I use both configuration files in a multiple pipelines, the configuration to export documents from elasticsearch to kafka does nothing but the configuration to consume data from Kafka and to export them to elasticsearch works and logstash does not stop working. Systemd does not reload Logstash.
I'am using Debian based packages for Elasticsearch, Zookeeper and Logstash installed on Elementary OS Loki which is based on Ubuntu 16.04.
Logstash 6.1.0, Elasticsearch 6.1.0, Zookeeper 3.4.8-1--1, Kafka 2.12-1.0.0.
Thanks for your help.
Here my Logstash configuration to export an index to a kafka topic:
input {
elasticsearch {
hosts => "localhost"
index => "poc-local-input"
query => '{ "query": { "match_all": {} } }'
hosts => "localhost:9200"
user => elastic
password => elastic
}
}output {
kafka {
client_id => "poc-local-elk-output-0"
codec => plain
topic_id => "poc-local-output"
}
}
The Logstash configuration to consume a topic and then send data to an index :
input {
kafka {
topics => 'poc-local-input'
client_id => "poc-local-input-0"
group_id => "poc-local-input"
}
}
filter {
dissect {
mapping => { "message" => "%{org_date_str} %{field1} %{field2} %{field3} %{field4} %{lat} %{lon}" }
}
date {
match => [ "org_date_str", "UNIX_MS" ]
target => "org_date"
}mutate {
rename => {
"lon" => "[coordinate][lon]"
"lat" => "[coordinate][lat]"
}
convert => {
"lon" => "float"
"lat" => "float"
"input_latency" => "float"
}remove_field => [
"org_date_str"
]
}
ruby {
init => "require 'time'"
code => "
latency = event.get('@timestamp') - event.get('org_date')
event.set('input_latency', latency)
"
add_tag => [ "calculated_input_latency" ]
}
}
output {
elasticsearch {
index => "poc-local-input"
hosts => "localhost:9200"
user => elastic
password => elastic
}
}
My pipelines.yml :
- pipeline.id: poc-local-elk-output
path.config: "/logstash-conf/poc-local-elk-output.conf"
I modified the /etc/systemd/system/logstash.service file and removed the "--path.settings" "/etc/logstash" and commented "path.config" line in /etc/logstash/logstash.yml so as to make Logstash systemd service working with multiple pipelines.
My init script :
#!/usr/bin/env bash
kafka_version="2.12-1.0.0"
kafka_dir="/opt/kafka/kafka_$kafka_version/"
kafka_bin_dir="$kafka_dir/bin"
kafka_topics="$kafka_bin_dir/kafka-topics.sh"$kafka_topics --zookeeper localhost:2181 --delete --topic poc-local-input
$kafka_topics --zookeeper localhost:2181 --delete --topic poc-local-outputsleep 3
$kafka_topics --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic poc-local-input
$kafka_topics --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic poc-local-outputcurl -u elastic:elastic -XDELETE localhost:9200/poc-local-input
curl -u elastic:elastic -XDELETE localhost:9200/poc-local-outputcurl -u elastic:elastic -XPUT localhost:9200/poc-local-input
curl -u elastic:elastic -XPUT localhost:9200/poc-local-outputcurl -u elastic:elastic -XPUT localhost:9200/poc-local-input/_mapping/doc -H 'Content-Type: application/json' -d '
{
"dynamic": "false",
"properties": {
"@timestamp": {
"type": "date"
},
"org_date": {
"type": "date"
},
"lat": {
"type": "float"
},
"lon": {
"type": "float"
},
"coordinate": {
"type": "geo_point"
},
"input_latency": {
"type": "float"
}
}
}'curl -u elastic:elastic -XPUT localhost:9200/poc-local-output/_mapping/log -H 'Content-Type: application/json' -d '
{
"dynamic": "false",
"properties": {
"@timestamp": {
"type": "date"
},
"org_date": {
"type": "date"
},
"latency": {
"type": "float"
}
}
}'
An exemple of a string produced to the kafka topic to be injected in Elasticsearch :
1513696417052 In35 6saQ i37Z yqA2 -79.0533 -74.1763
An exemple of the corresponding string which should be produced by the Logstash configuration and send to a kafka topic :
2017-12-19T15:21:14.560Z %{host} 1513696417052 In35 6saQ i37Z yqA2 -79.0533 -74.1763
When there are already data in the Elasticsearch index and I restart Logstash, the elk-to-kafka configuration sends the existing entries to the topic before to stop.
Also, notice the %{host} which should be replaced by the current host ip according to the logstash documentation but is not.
From Kafka output plugin | Logstash Reference [8.11] | Elastic :
The default codec is plain. Logstash will encode your events with not only the message field but also with a timestamp and hostname.