Hello,
I have kafka-logstash conf and logstash reciveing the logs from kafka. here is the config.
input {
kafka {
topics => ["sitlogtopic","locallogtopic"]
bootstrap_servers => "ddr-kafkadev.pvt.ccilindia.com:9092"
#auto_offset_reset => "earliest"
#consumer_threads => 1
#decorate_events => true
}
}
filter{
grok { match => { "message" => ["%{DATESTAMP:Timestamp} %{LOGLEVEL:Loglevel} %{WORD:hostname} (?<SpringAppName>%{WORD}\-%{WORD}) %{GREEDYDATA:MESSAGE}",
"%{DATESTAMP:Timestamp} %{LOGLEVEL:Loglevel} %{WORD:hostname} %{WORD:SpringAppName} %{GREEDYDATA:MESSAGE}","%{DATESTAMP:Timestamp} %{LOGLEVEL:Loglevel} %{WORD:HOSTNAME} (?<SpringAppName>%{WORD}\ %{WORD}) %{GREEDYDATA:MESSAGE}","%{DATESTAMP:Timestamp} %{LOGLEVEL:Loglevel} (?<hostname>%{WORD}\-%{WORD}\-%{WORD}) %{WORD:SpringAppName} %{GREEDYDATA:MESSAGE}","%{DATESTAMP:Timestamp} %{LOGLEVEL:Loglevel} (?<hostname>%{WORD}\-%{WORD}\-%{WORD}) (?<SpringAppName>%{WORD}\ %{WORD}) %{GREEDYDATA:MESSAGE}","%{DATESTAMP:Timestamp} %{LOGLEVEL:Loglevel} (?<hostname>%{WORD}\-%{WORD}\-%{WORD}) %{WORD:SpringAppName} %{GREEDYDATA:MESSAGE}","%{DATESTAMP:Timestamp} %{LOGLEVEL:Loglevel} (?<hostname>%{WORD}\-%{WORD}\-%{WORD}\-%{WORD}\-%{WORD}\-%{WORD}) (?<SpringAppName>%{WORD}\ %{WORD}) %{GREEDYDATA:MESSAGE}","(?<timestamp>%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME}) %{LOGLEVEL:level} %{WORD:hostname} (?<SpringAppName>%{WORD}) (?<MESSAGE>(.|\r|\n)*)","%{DATESTAMP:Timestamp} %{LOGLEVEL:Loglevel} (?<hostname>%{WORD}\-%{WORD}\-%{WORD}\-%{WORD}\-%{WORD}\-%{WORD}) (?<SpringAppName>%{WORD}\ %{WORD}) %{GREEDYDATA:MESSAGE}","%{DATESTAMP:Timestamp} %{LOGLEVEL:Loglevel} (?<hostname>%{WORD}\-%{WORD}) (?<SpringAppName>%{WORD}\ %{WORD}) %{GREEDYDATA:MESSAGE}","%{DATESTAMP:Timestamp} %{LOGLEVEL:Loglevel} %{WORD:hostname} (?<SpringAppName>%{WORD}\-%{WORD}) (?<stacktrace>(.|\r|\n)*)","%{DATESTAMP:Timestamp} %{LOGLEVEL:Loglevel} (?<hostname>%{WORD}\-%{WORD}\-%{WORD}\-%{WORD}\-%{WORD}\-%{WORD}) (?<SpringAppName>%{WORD}\-%{WORD}) %{GREEDYDATA:MESSAGE}","%{DATESTAMP:Timestamp} %{LOGLEVEL:Loglevel} (?<hostname>%{WORD}\-%{WORD}-%{WORD}) (?<SpringAppName>%{WORD}\-%{WORD}) %{GREEDYDATA:MESSAGE}"]}
}
#mutate { add_field => { "SpringAppName" => "%{Ap}%{Service}" } }
mutate {
remove_field => ['message','event.original']
}
}
output {
elasticsearch {
hosts => ["https://elastic-uat.ccilindia.net:9200"]
#index => "kafkadev-%{+yyyy.MM.dd}"
ilm_rollover_alias => "kafkadev"
ilm_pattern => "000001"
ilm_policy => "kafkadev"
ilm_enabled => true
cacert => "/etc/logstash/certs/GeoTrust-RSA-CA-Intermediate-2018.pem"
user => "elastic"
password => "Ccil@2023"
ssl => true
ssl_certificate_verification => true
}
stdout { codec => rubydebug }
}
but I have another server where I am sending filebeat logs to logstash but i am seeing the same kafka logs on filebeat index . here the filbeat logstash config.
input {
beats {
port => 5044
}
}
output {
elasticsearch {
hosts => ["https://elastic-uat.ccilindia.net:9200"]
user => "elastic"
password => "Ccil@2023"
cacert => "/etc/logstash/certs/GeoTrust-RSA-CA-Intermediate-2018.pem"
index => "gitlab-filebeat-8.7"
ssl => true
ssl_certificate_verification => false
}
stdout
{
codec =>rubydebug
}
}