Recently I integrated the ELK pipeline including Kafka, Logstash, Elastic Search and Kibana. I had a working instance of Kafka cluster, Elastic search(7.6) both in AWS and this blog just describes the steps involved with installing and configuring Logstash. Note that MSK is SSL enabled and commuicates only on TCP 9094
details of the Instances
Elastic Search (7.6)
vpc-xx-yyyy-8m6ru9hefy5uwq2xtlbhpspr8q.us-east-1.es.amazonaws.com
Zookeeper z-1.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:2181,z-2.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:2181,z-3.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:2181
Brokers
b-1.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:9094,b-2.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:9094
EC2 instance
10.10.10.07
Kibana
https://vpc-xx-yyyy-8m6ru9hefy5uwq2xtlbhpspr8q.us-east-1.es.amazonaws.com/_plugin/kibana/
Steps involved in the integration:
Create a topic in MSK from your EC2 instance
from your ../kafka_2.12-2.4.1/bin directory
run the command
./kafka-topics.sh --create --zookeeper z-1.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:2181,z-2.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:2181,z-3.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:2181 --replication-factor 2 --partitions 1 --topic WORLDNEWS
the command creates a topic "WORLDNEWS" in MSK
For you to connect to MSK from EC2, you would need to do the following
Your JVM truststore in EC2 need to talk to the MSK cluster. To do this, first create a folder named /tmp on the EC2 machine. Then, run the following command, replacing JDKFolder with the name of your JDK folder. For example, the name of the JDK folder on your instance might be java-1.8.0-openjdk-1.8.0.201.b09-0.amzn2.x86_64. for more information look up (https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html)
cp /usr/lib/jvm//jre/lib/security/cacerts /tmp/kafka.client.truststore.jks
Install logstash in the EC2 box
follow the steps outlined in https://www.elastic.co/guide/en/logstash/current/installing-logstash.html to install logstash
install logstash output plugin amazon_es in EC2 instance
run the following command from your /bin directory
./logstash-plugin install logstash-output-amazon_es
Configure Logstash
create a .conf file for pulling events from Kafka MSK and to send it to Elastic Search
The input plugin would connect to Kafka MSK
The output plugin would connect to Elastic Search
Create a config file elk.conf in the EC2 box under /etc/logstash
the contents of the file should be
input {
kafka {
group_id => "test-consumer-group"
topics => ["WORLDNEWS”]
bootstrap_servers => "b-1.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:9094, b-2.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:9094"
codec => "json"
security_protocol => "SSL"
ssl_truststore_location => "/tmp/kafka.client.truststore.jks"
}
}
output {
amazon_es {
hosts => ["vpc-xx-yyyy-8m6ru9hefy5uwq2xtlbhpspr8q.us-east-1.es.amazonaws.com"]
region => "us-east-1"
index => "production-logs-%{+YYYY.MM.dd}"
aws_access_key_id => ‘’
aws_secret_access_key => ' '
}
}
note:
you might need the access key and secret key of a static IAM access role in Amazon elastic search. This will allow logstash to write to ES. Otherwise without proper permissions you will get a 403 error.
Send data to MSK from EC2 (Producer)
From the ec2 instance, from your /opt/kafka_2.12-2.4.1/bin directory, run the following command
./kafka-console-producer.sh --broker-list b-2.xx-yyyy.78tshs.e4.kafka.us-east-1.amazonaws.com:9094 --producer.config client.properties --topic WORLDNEWS
Using this command you can send messages by typing into the command line.
start logstash from command line
from bin directory, run
./logstash -f /etc/logstash/elk.conf
Log into your Kibana URL
create an index Production
and you will see messages coming into Kibana via the ES.