Exctracting JSON format of data from the Input logs containing both JSON and Plain Text

Hi,

I am trying to send the JSON format of a log from the logs containing both JSON and non-JSON formats to a Kafka server.I have tried the below and very close to the solution what I need.

My logstash config looks like :
input {
tcp {
port => 5000
type => syslog
}
udp {
port => 5000
type => syslog
}
}

filter {
if "EIPCLELOGS" in [message] {
grok {
match => {
"message" => [
"(?[0-9-]+) <(?[0-9]+)>(?[0-9]+) %{TIMESTAMP_ISO8601:UTCtimestamp} %{JAVACLASS:class}-(?[a-z]+) (?[a-z0-9-]+) *[%{DATA:thread}] - - %{DATA:timestamp1} *%{LOGLEVEL:level} %{DATA:pid} --- *[%{DATA:thread2}] %{JAVACLASS:class2} *: %{GREEDYDATA:cleLog}"
]
}
}

mutate {
  remove_field => [ "timestamp1","pid","port","thread","thread2","level","class2","class","UTCtimestamp","Fields1","Fields2","Fields3","Fields5","Fields6","host","type","message","@version","@timestamp" ]
  remove_tag => ["timestamp1","pid","port","thread","thread2","level","class2","class","UTCtimestamp","Fields1","Fields2","Fields3","Fields5","Fields6","host","type","message","@version","@timestamp" ]

}
}
}

output {
if "eip" in [Header][ApplicationID]{
kafka {
codec => json{}
bootstrap_servers => "kafka servers"
topic_id => "cle-logs-eip"
}
}
}

My Output in kafka is :
{"cleLog":"{"Status":"from employee first page method","TransactionAfter":{"empId":"1","name":"emp1","designation":"manager","salary":3000.0},"Category":null,"Messages":{"Value":"EIPCLELOGS","Name":"Identifier"},"Header":{"TransactionType":"INFO","ServiceName":"class com.pepsico.eip.controllers.TestController","BusinessID2":"1","Hostname":"b484b154-2d07-473e-4cd0-f641/10.255.223.4","ComponentName":"firstPage","ApplicationID":"eip","Timestamp":"2018-08-07T12:27:01.730+0000","TransactionDomain":"Employee","BusinessID":"1","TransactionID":"1","ApplicationDomain":"Employee"},"TimeDuration":null,"TransactionBefore":"emp1","DataEncoding":null,"LogLevel":"INFO"}"}

expected output :
{"Status":"from employee first page method","TransactionAfter":{"empId":"1","name":"emp1","designation":"manager","salary":3000.0},"Category":null,"Messages":{"Value":"EIPCLELOGS","Name":"Identifier"},"Header":{"TransactionType":"INFO","ServiceName":"class .eip.controllers.TestController","BusinessID2":"1","Hostname":"b484b154-2d07-473e-4cd0-f641/10.255.223.4","ComponentName":"firstPage","ApplicationID":"eip","Timestamp":"2018-08-07T12:27:01.730+0000","TransactionDomain":"Employee","BusinessID":"1","TransactionID":"1","ApplicationDomain":"Employee"},"TimeDuration":null,"TransactionBefore":"emp1","DataEncoding":null,"LogLevel":"INFO"}

Basically, I need 2 things here,

  1. need to remove the cleLog wrapper from the output and
  2. need to send the logs to kafka only when the Header.ApplicationID is "eip"

It would be great, if someone can help me on this.

Please format the JSON blobs as preformatted text (preferably pretty-printed; use e.g. jsonlint.com) so we can see exactly what it looks like. What you posted has been mangled and isn't valid JSON.

@magnusbaeck
I want the output as :
{"Status":"from employee first page method","TransactionAfter":{"empId":"1","name":"emp1","designation":"manager","salary":3000.0},"Category":null,"Messages":{"Value":"EIPCLELOGS","Name":"Identifier"},"Header":{"TransactionType":"INFO","ServiceName":"class .eip.controllers.TestController","BusinessID2":"1","Hostname":"b484b154-2d07-473e-4cd0-f641/10.255.223.4","ComponentName":"firstPage","ApplicationID":"eip","Timestamp":"2018-08-07T12:27:01.730+0000","TransactionDomain":"Employee","BusinessID":"1","TransactionID":"1","ApplicationDomain":"Employee"},"TimeDuration":null,"TransactionBefore":"emp1","DataEncoding":null,"LogLevel":"INFO"}

This is the valid json.

The external wrapper that is the cleLogs is added when I use the Grok filter [ %{GREEDYDATA:cleLog} ],

Basically, I am trying to remove the cleLog wrapper so that I get the valid json as the output.
When I add the cleLog in the remove_tag filter as below
remove_tag => ["timestamp1","pid","port","thread","thread2","level","class2","class","UTCtimestamp","Fields1","Fields2","Fields3","Fields5","Fields6","host","type","message","@version","@timestamp" ,"cleLog" ]
then there is no output sent to the Kafka.

I have got the solution for this.

Added the

json {
source => "cleLog"
}
mutate {
remove_field => ["cleLog"]
remove_tag => ["cleLog"]
}

In filter for the solution

Thanks

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.