Hello Jeeth,
Please  find my both Logstash A and Logstash B configs
Logstash A config ( this will read application log and send to kafka topic)
input {
  file {
    path => "/var/solr/logs/solr_slow_requests.log"
    type => "solr-slow-req-log-data"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
filter {
  if [type] == "solr-slow-req-log-data" {
    grok {
      match => [
        "message", "%{TIMESTAMP_ISO8601:TSTAMP}%{SPACE}(?<priority>ERROR)%{SPACE}%{GREEDYDATA:ERROR_MESSAGE}",
        "message", "%{TIMESTAMP_ISO8601:TSTAMP}%{SPACE}(?:(?<priority>INFO)|(?<priority>WARN))%{SPACE}%{NOTSPACE:thread_name}%{SPACE}\[c:%{NOTSPACE:collection}%{SPACE}s:%{NOTSPACE:shard}%{SPACE}r:%{NOTSPACE:replica}%{SPACE}x:%{NOTSPACE:core}%{SPACE}%{NOTSPACE:category}%{SPACE}%{NOTSPACE:type2}:%{SPACE}\[%{NOTSPACE:collection2}]%{SPACE}webapp=(?<webapp>/solr)%{SPACE}path=%{NOTSPACE:SOLRPATH}%{SPACE}params=(?:(?<queryterms>\{\}\{.*\})|(%{NOTSPACE:queryterms}))%{SPACE}(?:(hits=%{BASE10NUM:hits}%{SPACE}status=%{BASE10NUM:status}%{SPACE}QTime=%{BASE10NUM:qtime})|(%{BASE10NUM:status}%{SPACE}%{BASE10NUM:qtime})|(status=%{BASE10NUM:status}%{SPACE}QTime=%{BASE10NUM:qtime}))"
      ]
    }
    date {
      match => [ "TSTAMP", "ISO8601" ]
      target => "@timestamp"
    }
    mutate {
      gsub => ["thread_name", "\(", "" ]
      gsub => ["thread_name", "\)", "" ]
      gsub => ["core", "\]", "" ]
      add_field => [ "HOSTNAME", "apphost1" ]
      add_field => [ "ENVIRONMENT", "dev" ]
      add_field => [ "APPLICATION_GROUP", "app" ]
      add_field => [ "DATACENTER", "onprem" ]
    }
    if [priority] == "ERROR" {
      mutate {
        replace => { "message" => "%{TSTAMP} ERROR %{ERROR_MESSAGE}" }
        remove_field => [ "ERROR_MESSAGE" ]
        remove_field => [ "TSTAMP" ]
      }
    }
    if [priority] == "INFO" or [priority] == "WARN" {
      mutate {
        remove_field => [ "message" ]
        remove_field => [ "TSTAMP" ]
      }
    }
    if "_grokparsefailure" in [tags] {
      drop { }
    }
  }
}
output {
  if [type] == "solr-slow-req-log-data" {
    if ("_grokparsefailure" in [tags]) {
      file {
        path => "/var/log/logstash/failure-solr-slow-requests-log-%{+YYYY-MM-dd}"
      }
    } else {
      kafka {
       bootstrap_servers => "kafkahost1:9092,kafkahost2:9092,kafkahost3:9092"
       topic_id => "solr-slow-requests-log"
      }
#      file {
#        path => "/var/log/logstash/success-solr-slow-log-data-%{+YYYY-MM-dd}"
#      }
      #stdout {
      #  codec => "rubydebug"
      #}
    }
 }
}
Logstash B config ( this is from input kafka --> output elasticsearch)
All topics are grouped into logstash group
input {
  # type is set at the client
  kafka {
    topic_id => "wildfly-app-availability-data"
    zk_connect => "zookeeper1:2181,zookeepr2:2181,zookeeper3:2181"
  }
  kafka {
    topic_id => "wildfly-app-performance-data"
    zk_connect => "zookeeper1:2181,zookeepr2:2181,zookeeper3:2181"
  }
  kafka {
    topic_id => "haproxy-log-data"
    zk_connect => "zookeeper1:2181,zookeepr2:2181,zookeeper3:2181"
  }
  kafka {
    topic_id => "solr-log-data"
    zk_connect => "zookeeper1:2181,zookeepr2:2181,zookeeper3:2181"
  }
  kafka {
    topic_id => "solr-slow-requests-log"
    zk_connect => "zookeeper1:2181,zookeepr2:2181,zookeeper3:2181"
  }
  kafka {
    topic_id => "wildfly-app-exception-data"
    zk_connect => "zookeeper1:2181,zookeepr2:2181,zookeeper3:2181"
  }
}
output {
  if [type] == "wildfly-app-availability-data" {
    elasticsearch {
      index => "wfaad-%{+YYYY.MM.dd}"
      hosts => ["elasticsearch-3"]
      manage_template => "false"
      workers => 2
      flush_size => 2000
      idle_flush_time => 30
      document_type => "wfaad"
    }
  }
  if [type] == "wildfly-app-performance-data" {
    elasticsearch {
      index => "wfapd-%{+YYYY.MM.dd}"
      hosts => ["elasticsearch-3"]
      manage_template => "false"
      workers => 2
      flush_size => 2000
      idle_flush_time => 30
      document_type => "wfapd"
    }
  }
  if [type] == "wildfly-app-exception-data" {
    elasticsearch {
      index => "wfaed-%{+YYYY.MM.dd}"
      hosts => ["elasticsearch-3"]
      manage_template => "false"
      workers => 2
      flush_size => 2000
      idle_flush_time => 30
      document_type => "wfaed"
    }
  }
  if [type] == "haproxy-log-data" {
    elasticsearch {
      index => "haproxy-%{+YYYY.MM.dd}"
      hosts => ["elasticsearch-3"]
      manage_template => "true"
      workers => 2
      flush_size => 2000
      idle_flush_time => 30
      document_type => "haproxy"
    }
  }
  if [type] == "solr-log-data" {
    elasticsearch {
      index => "solr-log-%{+YYYY.MM.dd}"
      hosts => ["elasticsearch-3"]
      manage_template => "true"
      workers => 2
      flush_size => 2000
      idle_flush_time => 30
      document_type => "solr-log"
    }
  }
  if [type] == "solr-slow-requests-log" {
    #file {
    #  path => "/var/log/logstash/solr-file-out-%{+YYYY-MM-dd}"
    #  codec => rubydebug
    #}
    elasticsearch {
      index => "solr-slow-requests-log-%{+YYYY.MM.dd}"
      hosts => ["elasticsearch-3"]
      manage_template => "true"
      workers => 2
      flush_size => 2000
      idle_flush_time => 30
      document_type => "solr-slow-requests-log"
    }
  }
}
To make sure the logs are getting into logstash I made this topic to write file to a local file and logstash creating them without issue but only from the topic it is unable to read and write to elasticsearch.
if it is a elasticsearch issue I should see anything related  with topic name in elasticsearch log but i haven't seen anything with such topic name( i mean also index name) im assuming something problem with logstash or it could be issue with kafka. In the logstash output Im seeing ":events_received=>0" for the config "solr-slow-requests-log" and for other configs even_received has some value assuming they are the message count which received.