Hello Jeeth,
Please find my both Logstash A and Logstash B configs
Logstash A config ( this will read application log and send to kafka topic)
input {
file {
path => "/var/solr/logs/solr_slow_requests.log"
type => "solr-slow-req-log-data"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
if [type] == "solr-slow-req-log-data" {
grok {
match => [
"message", "%{TIMESTAMP_ISO8601:TSTAMP}%{SPACE}(?<priority>ERROR)%{SPACE}%{GREEDYDATA:ERROR_MESSAGE}",
"message", "%{TIMESTAMP_ISO8601:TSTAMP}%{SPACE}(?:(?<priority>INFO)|(?<priority>WARN))%{SPACE}%{NOTSPACE:thread_name}%{SPACE}\[c:%{NOTSPACE:collection}%{SPACE}s:%{NOTSPACE:shard}%{SPACE}r:%{NOTSPACE:replica}%{SPACE}x:%{NOTSPACE:core}%{SPACE}%{NOTSPACE:category}%{SPACE}%{NOTSPACE:type2}:%{SPACE}\[%{NOTSPACE:collection2}]%{SPACE}webapp=(?<webapp>/solr)%{SPACE}path=%{NOTSPACE:SOLRPATH}%{SPACE}params=(?:(?<queryterms>\{\}\{.*\})|(%{NOTSPACE:queryterms}))%{SPACE}(?:(hits=%{BASE10NUM:hits}%{SPACE}status=%{BASE10NUM:status}%{SPACE}QTime=%{BASE10NUM:qtime})|(%{BASE10NUM:status}%{SPACE}%{BASE10NUM:qtime})|(status=%{BASE10NUM:status}%{SPACE}QTime=%{BASE10NUM:qtime}))"
]
}
date {
match => [ "TSTAMP", "ISO8601" ]
target => "@timestamp"
}
mutate {
gsub => ["thread_name", "\(", "" ]
gsub => ["thread_name", "\)", "" ]
gsub => ["core", "\]", "" ]
add_field => [ "HOSTNAME", "apphost1" ]
add_field => [ "ENVIRONMENT", "dev" ]
add_field => [ "APPLICATION_GROUP", "app" ]
add_field => [ "DATACENTER", "onprem" ]
}
if [priority] == "ERROR" {
mutate {
replace => { "message" => "%{TSTAMP} ERROR %{ERROR_MESSAGE}" }
remove_field => [ "ERROR_MESSAGE" ]
remove_field => [ "TSTAMP" ]
}
}
if [priority] == "INFO" or [priority] == "WARN" {
mutate {
remove_field => [ "message" ]
remove_field => [ "TSTAMP" ]
}
}
if "_grokparsefailure" in [tags] {
drop { }
}
}
}
output {
if [type] == "solr-slow-req-log-data" {
if ("_grokparsefailure" in [tags]) {
file {
path => "/var/log/logstash/failure-solr-slow-requests-log-%{+YYYY-MM-dd}"
}
} else {
kafka {
bootstrap_servers => "kafkahost1:9092,kafkahost2:9092,kafkahost3:9092"
topic_id => "solr-slow-requests-log"
}
# file {
# path => "/var/log/logstash/success-solr-slow-log-data-%{+YYYY-MM-dd}"
# }
#stdout {
# codec => "rubydebug"
#}
}
}
}
Logstash B config ( this is from input kafka --> output elasticsearch)
All topics are grouped into logstash group
input {
# type is set at the client
kafka {
topic_id => "wildfly-app-availability-data"
zk_connect => "zookeeper1:2181,zookeepr2:2181,zookeeper3:2181"
}
kafka {
topic_id => "wildfly-app-performance-data"
zk_connect => "zookeeper1:2181,zookeepr2:2181,zookeeper3:2181"
}
kafka {
topic_id => "haproxy-log-data"
zk_connect => "zookeeper1:2181,zookeepr2:2181,zookeeper3:2181"
}
kafka {
topic_id => "solr-log-data"
zk_connect => "zookeeper1:2181,zookeepr2:2181,zookeeper3:2181"
}
kafka {
topic_id => "solr-slow-requests-log"
zk_connect => "zookeeper1:2181,zookeepr2:2181,zookeeper3:2181"
}
kafka {
topic_id => "wildfly-app-exception-data"
zk_connect => "zookeeper1:2181,zookeepr2:2181,zookeeper3:2181"
}
}
output {
if [type] == "wildfly-app-availability-data" {
elasticsearch {
index => "wfaad-%{+YYYY.MM.dd}"
hosts => ["elasticsearch-3"]
manage_template => "false"
workers => 2
flush_size => 2000
idle_flush_time => 30
document_type => "wfaad"
}
}
if [type] == "wildfly-app-performance-data" {
elasticsearch {
index => "wfapd-%{+YYYY.MM.dd}"
hosts => ["elasticsearch-3"]
manage_template => "false"
workers => 2
flush_size => 2000
idle_flush_time => 30
document_type => "wfapd"
}
}
if [type] == "wildfly-app-exception-data" {
elasticsearch {
index => "wfaed-%{+YYYY.MM.dd}"
hosts => ["elasticsearch-3"]
manage_template => "false"
workers => 2
flush_size => 2000
idle_flush_time => 30
document_type => "wfaed"
}
}
if [type] == "haproxy-log-data" {
elasticsearch {
index => "haproxy-%{+YYYY.MM.dd}"
hosts => ["elasticsearch-3"]
manage_template => "true"
workers => 2
flush_size => 2000
idle_flush_time => 30
document_type => "haproxy"
}
}
if [type] == "solr-log-data" {
elasticsearch {
index => "solr-log-%{+YYYY.MM.dd}"
hosts => ["elasticsearch-3"]
manage_template => "true"
workers => 2
flush_size => 2000
idle_flush_time => 30
document_type => "solr-log"
}
}
if [type] == "solr-slow-requests-log" {
#file {
# path => "/var/log/logstash/solr-file-out-%{+YYYY-MM-dd}"
# codec => rubydebug
#}
elasticsearch {
index => "solr-slow-requests-log-%{+YYYY.MM.dd}"
hosts => ["elasticsearch-3"]
manage_template => "true"
workers => 2
flush_size => 2000
idle_flush_time => 30
document_type => "solr-slow-requests-log"
}
}
}
To make sure the logs are getting into logstash I made this topic to write file to a local file and logstash creating them without issue but only from the topic it is unable to read and write to elasticsearch.
if it is a elasticsearch issue I should see anything related with topic name in elasticsearch log but i haven't seen anything with such topic name( i mean also index name) im assuming something problem with logstash or it could be issue with kafka. In the logstash output Im seeing ":events_received=>0" for the config "solr-slow-requests-log" and for other configs even_received has some value assuming they are the message count which received.