Hi, I am trying to setup logstash pipeline from kafka to Elasticsearch.
The logstash logs show Attempted to resurrect connection to dead ES instance error.
[logstash.outputs.elasticsearch][main] Attempted to resurrect connection to dead ES instance, but got an error {:url=>"http://elastic:xxxxxx@elasticsearch:9200/", :exception=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :message=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ClientProtocolException] elasticsearch:9200 failed to respond"}
I have everything setup using docker. Here is the docker-compose.yml:
version: "1.0"
services:
setup:
image: docker.elastic.co/elasticsearch/elasticsearch:${ELK_STACK_VERSION}
container_name: setup
volumes:
- certs:/usr/share/elasticsearch/config/certs
user: "0"
command: >
bash -c '
if [ x${ELASTIC_PASSWORD} == x ]; then
echo "Set the ELASTIC_PASSWORD environment variable in the .env file";
exit 1;
elif [ x${KIBANA_PASSWORD} == x ]; then
echo "Set the KIBANA_PASSWORD environment variable in the .env file";
exit 1;
fi;
if [ ! -f config/certs/ca.zip ]; then
echo "Creating CA";
bin/elasticsearch-certutil ca --silent --pem -out config/certs/ca.zip;
unzip config/certs/ca.zip -d config/certs;
fi;
if [ ! -f config/certs/certs.zip ]; then
echo "Creating certs";
echo -ne \
"instances:\n"\
" - name: elasticsearch\n"\
" dns:\n"\
" - elasticsearch\n"\
" - localhost\n"\
" ip:\n"\
" - 127.0.0.1\n"\
> config/certs/instances.yml;
bin/elasticsearch-certutil cert --silent --pem -out config/certs/certs.zip --in config/certs/instances.yml --ca-cert config/certs/ca/ca.crt --ca-key config/certs/ca/ca.key;
unzip config/certs/certs.zip -d config/certs;
fi;
echo "Setting file permissions"
chown -R root:root config/certs;
find . -type d -exec chmod 750 \{\} \;;
find . -type f -exec chmod 640 \{\} \;;
echo "Waiting for Elasticsearch availability";
until curl -s --cacert config/certs/ca/ca.crt https://elasticsearch:9200 | grep -q "missing authentication credentials"; do sleep 30; done;
echo "Setting kibana_system password";
until curl -s -X POST --cacert config/certs/ca/ca.crt -u "elastic:${ELASTIC_PASSWORD}" -H "Content-Type: application/json" https://elasticsearch:9200/_security/user/kibana_system/_password -d "{\"password\":\"${KIBANA_PASSWORD}\"}" | grep -q "^{}"; do sleep 10; done;
echo "All done!";
'
healthcheck:
test: ["CMD-SHELL", "[ -f config/certs/elasticsearch/elasticsearch.crt ]"]
interval: 1s
timeout: 5s
retries: 120
zookeeper:
image: confluentinc/cp-zookeeper:${CONFLUENTINC_STACK_VERSION}
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
depends_on:
- zookeeper
image: confluentinc/cp-kafka:${CONFLUENTINC_STACK_VERSION}
container_name: kafka
ports:
- 9092:9092
- 9101:9101
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
elasticsearch:
depends_on:
setup:
condition: service_healthy
image: docker.elastic.co/elasticsearch/elasticsearch:${ELK_STACK_VERSION}
container_name: elasticsearch
volumes:
- certs:/usr/share/elasticsearch/config/certs
- elasticsearchdata:/usr/share/elasticsearch/data
ports:
- 127.0.0.1:9200:9200
environment:
- network.host=0.0.0.0
- node.name=elasticsearch
- cluster.name=docker-cluster
- cluster.initial_master_nodes=elasticsearch
- ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
- bootstrap.memory_lock=true
- xpack.security.enabled=true
- xpack.security.http.ssl.enabled=true
- xpack.security.http.ssl.key=certs/elasticsearch/elasticsearch.key
- xpack.security.http.ssl.certificate=certs/elasticsearch/elasticsearch.crt
- xpack.security.http.ssl.certificate_authorities=certs/ca/ca.crt
- xpack.security.transport.ssl.enabled=true
- xpack.security.transport.ssl.key=certs/elasticsearch/elasticsearch.key
- xpack.security.transport.ssl.certificate=certs/elasticsearch/elasticsearch.crt
- xpack.security.transport.ssl.certificate_authorities=certs/ca/ca.crt
- xpack.security.transport.ssl.verification_mode=certificate
- xpack.license.self_generated.type=basic
mem_limit: ${MEM_LIMIT}
ulimits:
memlock:
soft: -1
hard: -1
healthcheck:
test:
[
"CMD-SHELL",
"curl -s --cacert config/certs/ca/ca.crt https://localhost:9200 | grep -q 'missing authentication credentials'",
]
interval: 10s
timeout: 10s
retries: 120
kibana:
depends_on:
elasticsearch:
condition: service_healthy
image: docker.elastic.co/kibana/kibana:${ELK_STACK_VERSION}
container_name: kibana
volumes:
- certs:/usr/share/kibana/config/certs
- kibanadata:/usr/share/kibana/data
ports:
- 5601:5601
environment:
- SERVERNAME=kibana
- ELASTICSEARCH_HOSTS=https://elasticsearch:9200
- ELASTICSEARCH_USERNAME=kibana_system
- ELASTICSEARCH_PASSWORD=${KIBANA_PASSWORD}
- ELASTICSEARCH_SSL_CERTIFICATEAUTHORITIES=config/certs/ca/ca.crt
mem_limit: ${MEM_LIMIT}
healthcheck:
test:
[
"CMD-SHELL",
"curl -s -I http://localhost:5601 | grep -q 'HTTP/1.1 302 Found'",
]
interval: 10s
timeout: 10s
retries: 120
logstash:
depends_on:
- kafka
- elasticsearch
image: docker.elastic.co/logstash/logstash:${ELK_STACK_VERSION}
container_name: logstash
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
environment:
- http.host=0.0.0.0
- ELASTICSEARCH_HOST=http://elasticsearch:9200
- ELASTICSEARCH_USERNAME=logstash_system
- ELASTICSEARCH_PASSWORD=${LOGSTASH_PASSWORD}
command: >
logstash -f /usr/share/logstash/pipeline/logstash.conf
volumes:
certs:
driver: local
elasticsearchdata:
driver: local
kibanadata:
driver: local
logstash.conf
input {
kafka {
bootstrap_servers => "kafka:29092"
topics => ["topic"]
group_id => "logstash_consumer_group"
codec => "json"
decorate_events => true
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
user => "elastic"
password => "password"
index => "index"
document_id => "%{[@metadata][fingerprint]}"
}
}
Both the docker-compose.yml file and logstash.conf are in the same directory.