ELK with Kafka

Hi ,

I am trying to introduce Kafka in ELK. I have 2 servers
On first, configured ELK , filebeat and kafka input plugin
On second, installed Kafka, logstash, kafka output plugin

I am trying to push my second machine's log into kafka and then into elasticsearch which is on first machine.

Somehow filebeat is working fine but using kafka its not working. Here are the input, output files

first machine which is ELK server -

[root@watlelk01 conf.d]# cat 02-beats-input.conf
input {
beats {
port => 5044
ssl => true
ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
}
}

[root@watlelk01 conf.d]# cat 10-syslog-filter.conf
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}

[root@watlelk01 conf.d]# cat 20-logstash-logs-kafka-consumer.conf
input {
kafka {
zk_connect => '9.2.153.25:2181'
topic_id => 'logstash_logs'
}
}

output {
stdout { codec => rubydebug }
}
[root@watlelk01 conf.d]# cat 30-elasticsearch-output.conf
output {
elasticsearch {
hosts => ["localhost:9200"]
sniffing => true
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}

On second i.e. kafka machine -

[root@watlelkyz00 conf.d]# ls
10-logstash-logs-kafka-producer.conf 20-syslog-filter.conf
[root@watlelkyz00 conf.d]# cat 10-logstash-logs-kafka-producer.conf
input {
file {
path => ["/var/logs//.log"]
}
}

output {
kafka {
bootstrap_servers => 'localhost:9092'
topic_id => 'logstash_logs'
}
}
[root@watlelkyz00 conf.d]# cat 20-syslog-filter.conf
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}