Sending only Json format of log to Kafka server from logs containing both json and non json format in logstash config

I am trying to send the Json format of the log from the logs containing both json and non json format to Kafka server.

example of logs: 880 <14>1 2018-08-06T10:49:05.89677+00:00 dev.hello-world 24f9ade2-1bdb-46c2-bc5c-4b25a277832e [APP/PROC/WEB/0] - - 2018-08-06 10:49:05.894 INFO 24 --- [nio-8080-exec-2] classnmae : {"Status":"from employee first page method","TransactionAfter":{"empId":"1","name":"emp1","designation":"manager","salary":3000.0},"Category":null,"Messages":{"Value":"EIPCLELOGS","Name":"Identifier"},"Header":{"TransactionType":"INFO","ServiceName":"class controllers.TestController","BusinessID2":"1","Hostname":"0e2edd3e-c649-472c-539d-6dcb/10.255.223.92","ComponentName":"firstPage","ApplicationID":"abc","Timestamp":"2018-08-06T10:49:05.883+0000","TransactionDomain":"Employee","BusinessID":"1","TransactionID":"1","ApplicationDomain":"Employee"},"TimeDuration":null,"TransactionBefore":"emp1","DataEncoding":null,"LogLevel":"INFO"}

tried solution:

filter {
if "EIPCLELOGS" in [message] {
grok {
match => {
"message" => [
"(?[0-9-]+) <(?[0-9]+)>(?[0-9]+) %{TIMESTAMP_ISO8601:UTCtimestamp} %{JAVACLASS:class}-(?[a-z]+) (?[a-z0-9-]+) *[%{DATA:thread}] - - %{DATA:timestamp1} *%{LOGLEVEL:level} %{DATA:pid} --- *[%{DATA:thread2}] %{JAVACLASS:class2} *: %{GREEDYDATA:log}"
]
}
}

json {
    source => "log"
    target => "parsedJson"
    remove_field=>[log"]
}

mutate {
  add_field => {               
    "message" => ["%{[parsedJson][message]}"]
  }
}

}
}

expected OutPut in elastic search :

{"Status":"from employee first page method","TransactionAfter":{"empId":"1","name":"emp1","designation":"manager","salary":3000.0},"Category":null,"Messages":{"Value":"EIPCLELOGS","Name":"Identifier"},"Header":{"TransactionType":"INFO","ServiceName":"class .controllers.TestController","BusinessID2":"1","Hostname":"1554f7af-5d9c-4f19-4c48-0ca3/10.255.223.51","ComponentName":"firstPage","ApplicationID":"eip","Timestamp":"2018-08-06T10:15:58.483+0000","TransactionDomain":"Employee","BusinessID":"1","TransactionID":"1","ApplicationDomain":"Employee"},"TimeDuration":null,"TransactionBefore":"emp1","DataEncoding":null,"LogLevel":"INFO"}

also, tried prune

prune {
whitelist_names => ["using the wanted json fromat"]
}

The If with grok works fine in the filter.I get grok filters correctly,But the later doesn't work. I have tried removing the fields in mutate directly as well which still does not work.

All the above solution does not seem to be working.Any help would be appreciated.Thanks

I moved this over to the logstash forum. You might want to take some more time and properly format this post, it is pretty hard to read.

There is no field nested in parsedJson called message, so this will not work.

@spinscale Sorry about the format.I will make changes.

@Badger i am not looking for nested field. I want the whole JSON as a response.Are these steps correct for getting only the JSOn from the log input??

Hi,
I have tried more on this and I am almost near to what I am looking for.

My logstash config looks like:
input {
tcp {
port => 5000
type => syslog
}
udp {
port => 5000
type => syslog
}
}

filter {
if "EIPCLELOGS" in [message] {
grok {
match => {
"message" => [
"(?[0-9-]+) <(?[0-9]+)>(?[0-9]+) %{TIMESTAMP_ISO8601:UTCtimestamp} %{JAVACLASS:class}-(?[a-z]+) (?[a-z0-9-]+) *[%{DATA:thread}] - - %{DATA:timestamp1} *%{LOGLEVEL:level} %{DATA:pid} --- *[%{DATA:thread2}] %{JAVACLASS:class2} *: %{GREEDYDATA:cleLog}"
]
}
}

mutate {
  remove_field => [ "timestamp1","pid","port","thread","thread2","level","class2","class","UTCtimestamp","Fields1","Fields2","Fields3","Fields5","Fields6","host","type","message","@version","@timestamp" ]
  remove_tag => ["timestamp1","pid","port","thread","thread2","level","class2","class","UTCtimestamp","Fields1","Fields2","Fields3","Fields5","Fields6","host","type","message","@version","@timestamp" ]

}
}
}

output {
if "eip" in [Header][ApplicationID]{
kafka {
codec => json{}
bootstrap_servers => "kafka servers"
topic_id => "cle-logs-eip"
}
}
}

My Output in kafka is :
{"cleLog":"{"Status":"from employee first page method","TransactionAfter":{"empId":"1","name":"emp1","designation":"manager","salary":3000.0},"Category":null,"Messages":{"Value":"EIPCLELOGS","Name":"Identifier"},"Header":{"TransactionType":"INFO","ServiceName":"class com.pepsico.eip.controllers.TestController","BusinessID2":"1","Hostname":"b484b154-2d07-473e-4cd0-f641/10.255.223.4","ComponentName":"firstPage","ApplicationID":"eip","Timestamp":"2018-08-07T12:27:01.730+0000","TransactionDomain":"Employee","BusinessID":"1","TransactionID":"1","ApplicationDomain":"Employee"},"TimeDuration":null,"TransactionBefore":"emp1","DataEncoding":null,"LogLevel":"INFO"}"}

expected output:
{"Status":"from employee first page method","TransactionAfter":{"empId":"1","name":"emp1","designation":"manager","salary":3000.0},"Category":null,"Messages":{"Value":"EIPCLELOGS","Name":"Identifier"},"Header":{"TransactionType":"INFO","ServiceName":"class .eip.controllers.TestController","BusinessID2":"1","Hostname":"b484b154-2d07-473e-4cd0-f641/10.255.223.4","ComponentName":"firstPage","ApplicationID":"eip","Timestamp":"2018-08-07T12:27:01.730+0000","TransactionDomain":"Employee","BusinessID":"1","TransactionID":"1","ApplicationDomain":"Employee"},"TimeDuration":null,"TransactionBefore":"emp1","DataEncoding":null,"LogLevel":"INFO"}

Basically, I need 2 things here,

  1. need to remove the cleLog wrapper from the output and
  2. need to send the logs to kafka only when the Header.ApplicationID is "eip"

It would be great help if someone can help me on this.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.