Hi,
I am trying to send the JSON format of a log from the logs containing both JSON and non-JSON formats to a Kafka server.I have tried the below and very close to the solution what I need.
My logstash config looks like :
input {
tcp {
port => 5000
type => syslog
}
udp {
port => 5000
type => syslog
}
}
filter {
if "EIPCLELOGS" in [message] {
grok {
match => {
"message" => [
"(?[0-9-]+) <(?[0-9]+)>(?[0-9]+) %{TIMESTAMP_ISO8601:UTCtimestamp} %{JAVACLASS:class}-(?[a-z]+) (?[a-z0-9-]+) *[%{DATA:thread}] - - %{DATA:timestamp1} *%{LOGLEVEL:level} %{DATA:pid} --- *[%{DATA:thread2}] %{JAVACLASS:class2} *: %{GREEDYDATA:cleLog}"
]
}
}
mutate {
remove_field => [ "timestamp1","pid","port","thread","thread2","level","class2","class","UTCtimestamp","Fields1","Fields2","Fields3","Fields5","Fields6","host","type","message","@version","@timestamp" ]
remove_tag => ["timestamp1","pid","port","thread","thread2","level","class2","class","UTCtimestamp","Fields1","Fields2","Fields3","Fields5","Fields6","host","type","message","@version","@timestamp" ]
}
}
}
output {
if "eip" in [Header][ApplicationID]{
kafka {
codec => json{}
bootstrap_servers => "kafka servers"
topic_id => "cle-logs-eip"
}
}
}
My Output in kafka is :
{"cleLog":"{"Status":"from employee first page method","TransactionAfter":{"empId":"1","name":"emp1","designation":"manager","salary":3000.0},"Category":null,"Messages":{"Value":"EIPCLELOGS","Name":"Identifier"},"Header":{"TransactionType":"INFO","ServiceName":"class com.pepsico.eip.controllers.TestController","BusinessID2":"1","Hostname":"b484b154-2d07-473e-4cd0-f641/10.255.223.4","ComponentName":"firstPage","ApplicationID":"eip","Timestamp":"2018-08-07T12:27:01.730+0000","TransactionDomain":"Employee","BusinessID":"1","TransactionID":"1","ApplicationDomain":"Employee"},"TimeDuration":null,"TransactionBefore":"emp1","DataEncoding":null,"LogLevel":"INFO"}"}
expected output :
{"Status":"from employee first page method","TransactionAfter":{"empId":"1","name":"emp1","designation":"manager","salary":3000.0},"Category":null,"Messages":{"Value":"EIPCLELOGS","Name":"Identifier"},"Header":{"TransactionType":"INFO","ServiceName":"class .eip.controllers.TestController","BusinessID2":"1","Hostname":"b484b154-2d07-473e-4cd0-f641/10.255.223.4","ComponentName":"firstPage","ApplicationID":"eip","Timestamp":"2018-08-07T12:27:01.730+0000","TransactionDomain":"Employee","BusinessID":"1","TransactionID":"1","ApplicationDomain":"Employee"},"TimeDuration":null,"TransactionBefore":"emp1","DataEncoding":null,"LogLevel":"INFO"}
Basically, I need 2 things here,
- need to remove the cleLog wrapper from the output and
- need to send the logs to kafka only when the Header.ApplicationID is "eip"
It would be great, if someone can help me on this.