Logstash not forwarding messages to elasticsearch but able to write to file locally

Im facing strange issue and im new to elastic stack
this is my flow
logstash A (input application logs , output kafka topics) => kafka (topics , all topics to logstash consumer group) => logstash B ( input kafka , output elasticsearch) => kibana

I have already 5 topics writing to elasticsearch by "Logstash B" I recently created new topic and added to same group(logstash) but the new topic messages are not forwarding to elasticsearch the previous topics are working fine

not seeing any error in both logstash and elasticsearch

  1. logstash configuration is okay and no error when i run logstash in verbose mode
  2. restarted the elastic search cluster (it is a 8 node cluster)

Please share your logstash conf

Hello Jeeth,

Please find my both Logstash A and Logstash B configs

Logstash A config ( this will read application log and send to kafka topic)
input {

  file {
    path => "/var/solr/logs/solr_slow_requests.log"
    type => "solr-slow-req-log-data"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }

}

filter {

  if [type] == "solr-slow-req-log-data" {

    grok {
      match => [
        "message", "%{TIMESTAMP_ISO8601:TSTAMP}%{SPACE}(?<priority>ERROR)%{SPACE}%{GREEDYDATA:ERROR_MESSAGE}",
        "message", "%{TIMESTAMP_ISO8601:TSTAMP}%{SPACE}(?:(?<priority>INFO)|(?<priority>WARN))%{SPACE}%{NOTSPACE:thread_name}%{SPACE}\[c:%{NOTSPACE:collection}%{SPACE}s:%{NOTSPACE:shard}%{SPACE}r:%{NOTSPACE:replica}%{SPACE}x:%{NOTSPACE:core}%{SPACE}%{NOTSPACE:category}%{SPACE}%{NOTSPACE:type2}:%{SPACE}\[%{NOTSPACE:collection2}]%{SPACE}webapp=(?<webapp>/solr)%{SPACE}path=%{NOTSPACE:SOLRPATH}%{SPACE}params=(?:(?<queryterms>\{\}\{.*\})|(%{NOTSPACE:queryterms}))%{SPACE}(?:(hits=%{BASE10NUM:hits}%{SPACE}status=%{BASE10NUM:status}%{SPACE}QTime=%{BASE10NUM:qtime})|(%{BASE10NUM:status}%{SPACE}%{BASE10NUM:qtime})|(status=%{BASE10NUM:status}%{SPACE}QTime=%{BASE10NUM:qtime}))"
      ]
    }

    date {
      match => [ "TSTAMP", "ISO8601" ]
      target => "@timestamp"
    }

    mutate {
      gsub => ["thread_name", "\(", "" ]
      gsub => ["thread_name", "\)", "" ]
      gsub => ["core", "\]", "" ]
      add_field => [ "HOSTNAME", "apphost1" ]
      add_field => [ "ENVIRONMENT", "dev" ]
      add_field => [ "APPLICATION_GROUP", "app" ]
      add_field => [ "DATACENTER", "onprem" ]
    }

    if [priority] == "ERROR" {
      mutate {
        replace => { "message" => "%{TSTAMP} ERROR %{ERROR_MESSAGE}" }
        remove_field => [ "ERROR_MESSAGE" ]
        remove_field => [ "TSTAMP" ]
      }
    }

    if [priority] == "INFO" or [priority] == "WARN" {
      mutate {
        remove_field => [ "message" ]
        remove_field => [ "TSTAMP" ]
      }
    }

    if "_grokparsefailure" in [tags] {
      drop { }
    }

  }

}

output {

  if [type] == "solr-slow-req-log-data" {

    if ("_grokparsefailure" in [tags]) {
      file {
        path => "/var/log/logstash/failure-solr-slow-requests-log-%{+YYYY-MM-dd}"
      }

    } else {

      kafka {
       bootstrap_servers => "kafkahost1:9092,kafkahost2:9092,kafkahost3:9092"
       topic_id => "solr-slow-requests-log"
      }


#      file {
#        path => "/var/log/logstash/success-solr-slow-log-data-%{+YYYY-MM-dd}"
#      }
      #stdout {
      #  codec => "rubydebug"
      #}

    }

 }

}

Logstash B config ( this is from input kafka --> output elasticsearch)
All topics are grouped into logstash group

input {

  # type is set at the client

  kafka {
    topic_id => "wildfly-app-availability-data"
    zk_connect => "zookeeper1:2181,zookeepr2:2181,zookeeper3:2181"
  }
  kafka {
    topic_id => "wildfly-app-performance-data"
    zk_connect => "zookeeper1:2181,zookeepr2:2181,zookeeper3:2181"
  }
  kafka {
    topic_id => "haproxy-log-data"
    zk_connect => "zookeeper1:2181,zookeepr2:2181,zookeeper3:2181"
  }
  kafka {
    topic_id => "solr-log-data"
    zk_connect => "zookeeper1:2181,zookeepr2:2181,zookeeper3:2181"
  }
  kafka {
    topic_id => "solr-slow-requests-log"
    zk_connect => "zookeeper1:2181,zookeepr2:2181,zookeeper3:2181"
  }
  kafka {
    topic_id => "wildfly-app-exception-data"
    zk_connect => "zookeeper1:2181,zookeepr2:2181,zookeeper3:2181"
  }

}

output {

  if [type] == "wildfly-app-availability-data" {

    elasticsearch {
      index => "wfaad-%{+YYYY.MM.dd}"
      hosts => ["elasticsearch-3"]
      manage_template => "false"
      workers => 2
      flush_size => 2000
      idle_flush_time => 30
      document_type => "wfaad"
    }

  }

  if [type] == "wildfly-app-performance-data" {

    elasticsearch {
      index => "wfapd-%{+YYYY.MM.dd}"
      hosts => ["elasticsearch-3"]
      manage_template => "false"
      workers => 2
      flush_size => 2000
      idle_flush_time => 30
      document_type => "wfapd"
    }

  }

  if [type] == "wildfly-app-exception-data" {

    elasticsearch {
      index => "wfaed-%{+YYYY.MM.dd}"
      hosts => ["elasticsearch-3"]
      manage_template => "false"
      workers => 2
      flush_size => 2000
      idle_flush_time => 30
      document_type => "wfaed"
    }

  }

  if [type] == "haproxy-log-data" {



    elasticsearch {
      index => "haproxy-%{+YYYY.MM.dd}"
      hosts => ["elasticsearch-3"]
      manage_template => "true"
      workers => 2
      flush_size => 2000
      idle_flush_time => 30
      document_type => "haproxy"
    }

  }

  if [type] == "solr-log-data" {


    elasticsearch {
      index => "solr-log-%{+YYYY.MM.dd}"
      hosts => ["elasticsearch-3"]
      manage_template => "true"
      workers => 2
      flush_size => 2000
      idle_flush_time => 30
      document_type => "solr-log"
    }

  }

  if [type] == "solr-slow-requests-log" {

    #file {
    #  path => "/var/log/logstash/solr-file-out-%{+YYYY-MM-dd}"
    #  codec => rubydebug
    #}

    elasticsearch {
      index => "solr-slow-requests-log-%{+YYYY.MM.dd}"
      hosts => ["elasticsearch-3"]
      manage_template => "true"
      workers => 2
      flush_size => 2000
      idle_flush_time => 30
      document_type => "solr-slow-requests-log"
    }

  }

}

To make sure the logs are getting into logstash I made this topic to write file to a local file and logstash creating them without issue but only from the topic it is unable to read and write to elasticsearch.

if it is a elasticsearch issue I should see anything related with topic name in elasticsearch log but i haven't seen anything with such topic name( i mean also index name) im assuming something problem with logstash or it could be issue with kafka. In the logstash output Im seeing ":events_received=>0" for the config "solr-slow-requests-log" and for other configs even_received has some value assuming they are the message count which received.

Can you remove below config and reload pipeline and check the output.
if "_grokparsefailure" in [tags] { drop { } }

Hello Jeeth,

I figured the issue partially, there is a problem in kafka side

  1. in the logstash B config side I created a new file with only one topic and started the config file then it was added automatically into logstash consumer group
  2. I have logstash consumer group in kafka, any topics mentioned as input from kafka they are getting created in Kafka.
  3. I defined this index solr-slow-requests-log-%{+YYYY.MM.dd} part in new config file( full config) and restarted kafka. This created topic in Kafka but still this is not creating the index so I changed the topic name(like test-topic) and created a new consumer group for the topic(test-topic-group), included the new group_id in logstash config but I kept the index name same. This worked now since it does not belong to logstash index is created and I can see logs in kibana dashboard. But I don't know why if the topic is added into logstash group then I'm not seeing messages pushing into elasticsearch

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.