Recently I integrated the ELK pipeline including Kafka, Logstash, Elastic Search and Kibana. I had a working instance of Kafka cluster, Elastic search(7.6) both in AWS and this blog just describes the steps involved with installing and configuring Logstash. Note that MSK is SSL enabled and commuicates only on TCP 9094

details of the Instances

Elastic Search (7.6)
Zookeeper z-1.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:2181,z-2.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:2181,z-3.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:2181
EC2 instance

Steps involved in the integration:

Create a topic in MSK from your EC2 instance

from your ../kafka_2.12-2.4.1/bin directory
run the command
./kafka-topics.sh --create --zookeeper z-1.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:2181,z-2.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:2181,z-3.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:2181 --replication-factor 2 --partitions 1 --topic WORLDNEWS

the command creates a topic "WORLDNEWS" in MSK

For you to connect to MSK from EC2, you would need to do the following

Your JVM truststore in EC2 need to talk to the MSK cluster. To do this, first create a folder named /tmp on the EC2 machine. Then, run the following command, replacing JDKFolder with the name of your JDK folder. For example, the name of the JDK folder on your instance might be java-1.8.0-openjdk- for more information look up (https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html)

cp /usr/lib/jvm//jre/lib/security/cacerts /tmp/kafka.client.truststore.jks

Install logstash in the EC2 box

follow the steps outlined in https://www.elastic.co/guide/en/logstash/current/installing-logstash.html to install logstash

install logstash output plugin amazon_es in EC2 instance

run the following command from your /bin directory

./logstash-plugin install logstash-output-amazon_es

Configure Logstash

create a .conf file for pulling events from Kafka MSK and to send it to Elastic Search
The input plugin would connect to Kafka MSK
The output plugin would connect to Elastic Search

Create a config file elk.conf in the EC2 box under /etc/logstash

the contents of the file should be

input {
kafka {
group_id => "test-consumer-group"
topics => ["WORLDNEWS”]
bootstrap_servers => "b-1.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:9094, b-2.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:9094"
codec => "json"
security_protocol => "SSL"
ssl_truststore_location => "/tmp/kafka.client.truststore.jks"

output {
amazon_es {
hosts => ["vpc-xx-yyyy-8m6ru9hefy5uwq2xtlbhpspr8q.us-east-1.es.amazonaws.com"]
region => "us-east-1"
index => "production-logs-%{+YYYY.MM.dd}"
aws_access_key_id => ‘’
aws_secret_access_key => ' '

you might need the access key and secret key of a static IAM access role in Amazon elastic search. This will allow logstash to write to ES. Otherwise without proper permissions you will get a 403 error.

Send data to MSK from EC2 (Producer)

From the ec2 instance, from your /opt/kafka_2.12-2.4.1/bin directory, run the following command

./kafka-console-producer.sh --broker-list b-2.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:9094 --producer.config client.properties --topic WORLDNEWS

Using this command you can send messages by typing into the command line.

start logstash from command line

from bin directory, run
./logstash -f /etc/logstash/elk.conf

Log into your Kibana URL

create an index Production
and you will see messages coming into Kibana via the ES.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.