Log stash pulling messages from kafka

Hi team,
I have 3 kafka brokers, and 2 topics each have 15 partitions. Also, i have got the cluster of ES with 5 nodes, and each of them have 2 log stash config running to pull messages from 2 kafka topices. But i see its not reading the messages from all the partitions, and also, i see there is a delay in writing the message to the ES.
input {
kafka {
zk_connect => ':2181,:2181,:2181'
topic_id => 'collectdmetric'
consumer_threads => 3
}
}

output {
if [type] == 'collectd' {
elasticsearch {
hosts => [":9200",":9200",":9200"]
manage_template => false
index => "collectdmetrics-%{+YYYY.MM.dd}"
}
}
}

input {
kafka {
zk_connect => ':2181,:2181,:2181'
topic_id => 'tomcat'
consumer_threads => 3
}
}

output {
if [type] == 'tomcat' {
elasticsearch {
hosts => [":9200",":9200",":9200"]
manage_template => false
index => "tomcat-%{+YYYY.MM.dd}"
}
}
}
health status index pri rep docs.count docs.deleted store.size pri.store.size
green open collectdmetrics-2016.05.03 5 1 98612060 0 37gb 18.5gb
green open tomcat-2016.05.03 5 1 9861200 0 6gb 3gb
Can some one please guide to to the right directions ?

I think this question might be better answered by people over in the Logstash group.