Parse Delimited values from JSON log

Hello

i have logs in the following format
{
"timeMillis" : 1590564813191,
"thread" : "JOBSCHEDULER_THREAD_2",
"level" : "ALERT",
"loggerName" : "JobScheduler",
"message" : "value1*value2*value3*value4*",
"endOfBatch" : false,
"loggerFqcn" : "org.apache.logging.log4j.spi.AbstractLogger",
"threadId" : 18,
"threadPriority" : 5
}

i was able to parse the JSON fields using below pattern i.e. entire value in "message" in JSON is mapped to a field called "message" and shown in Kibana.

json {
source => "message"
}
date {
timezone => "UTC"
match => ["timeMillis", "UNIX_MS"]
target => "@timestamp"
}

However i am trying to parse the values with in the field "message" which are delimited by '*'. i want to get each value in to a different field (fields are pre-defined as the pattern is constant ).

i have tried different ways but not succeeded. please help me.

You could use mutate+split to turn the message field into an array of 4 strings, but if you want them to be top-level fields then what do you want them to be called?

i want them to be top-level fields say field names are field1, field2,field3,field4.

Better to use ruby-code filter to segregate message as different key value pair

ruby {
	code => '
		message = event.get("message").split("*")
		message.each_index do |i|
			event.set("field#{i+1}", message[i])
		end
	'
}

you will get following output

   "message" => "value1*value2*value3*value4*",
    "field1" => "value1",
    "field2" => "value2",
    "field3" => "value3",
    "field4" => "value4",

Thanks Chitresh. i will try the approach you suggested. but infact the field names field1, field2 etc.. are not exact i just gave some dummy names..so they are not in the same pattern to use "field#{i+1}". field names are "alertid', "kpiobserved", "kpithreshold", "version".

OK, so the ruby code would look more like

message = event.get("message").split("*")
event.set("alertid", message[0])
event.set("kpiobserved", message[1])
event.set("kpithreshold", message[2])
event.set("version", message[3])

Thank you so much. it is working now. but only one issue here... for the message[0] it is actually parsing the entire message till first '*' (as pasted below ) but not the first value1 in the JSON field message. seems like its interpreting message with entire JSON message not just the field message

i am assuming it can be fixed by match filter. will try that

{
"timeMillis" : 1590564813191,
"thread" : "JOBSCHEDULER_THREAD_2",
"level" : "ALERT",
"loggerName" : "JobScheduler",
" message " : "value1

First you parse the JSON object (parent message) and select sub object (child message) and then segregate that message

		Prmessage = JSON.parse(event.get("message"))
		Chmessage =  Prmessage["message"].split("*")
		event.set("alertid", Chmessage[0])
		event.set("kpiobserved", Chmessage[1])
		event.set("kpithreshold", Chmessage[2])
		event.set("version", Chmessage[3])

Thanks Chitresh. But this is actually throwing a ruby exception here...

[2020-05-29T14:36:31,643][ERROR][logstash.filters.ruby    ][main] Ruby exception

occurred: unexpected token at '{
"timeMillis" : 1590564829136,
"thread" : "New I/O worker #3",
"level" : "ALERT",
"loggerName" : "com.peg

Sample JSON log from the log file is below

{
"timeMillis" : 1590564829136,
"thread" : "New I/O worker #3",
"level" : "ALERT",
"loggerName" : "com.pega.pegarules.data.internal.store.DataStorePreparedStatement",
"message" : "2020-05-27 07:33:49,136 GMT*8*PEGA0005*1085*500*62f9914ac12a17cbabacccadf2b2dc96*NA*NA*BP2GMKQR6ZORFVTHYFF40BNL8D52PQOQPA*NA*PegaSample*null*c5f8d11e1c6712b822be592273fd678f*N*0*BP2GMKQR6ZORFVTHYFF40BNL8D52PQOQPA*167*New I/O worker #3*STANDARD*com.pega.pegarules.data.internal.store.DataStorePreparedStatement*NA*NA*NA*NA*NA*****NA*NA*NA*NA*NAinitial Executable;0 additional frames in stack;*NADatabase operation took more than the threshold of 500 ms: 1,085 ms\tSQL: UPDATE data.pr_data_stream_sessions "PC0" SET "pytimeout" = ? ,"pylastseendatetime" = ? WHERE ( "PC0"."pyid" = ? )*",
"endOfBatch" : false,
"loggerFqcn" : "org.apache.logging.log4j.spi.AbstractLogger",
"threadId" : 18,
"threadPriority" : 5
}

logstash configuration i am running...

input {
beats {
port => "5044"
}
}
filter {
json {
source => "message"
}
ruby
{
code => '
alertMessage=JSON.parse(event.get("message"))
kpis =alertMessage["message"].split("*")
event.set("alertTimestamp", kpis[0])
event.set("pegaVersion", kpis[1])
event.set("alertID", kpis[2])
event.set("kpiObserved", kpis[3])
event.set("kpithreshold", kpis[4])
event.set("ServerID", kpis[5]);
event.set("tenantName", kpis[6])
event.set("tenantHash", kpis[7])
event.set("requestorID", kpis[8])
event.set("userID", kpis[9])
event.set("workPool", kpis[10])
event.set("appVersion", kpis[11])
event.set("encodedRuleset", kpis[12])
event.set("checkoutEnabled", kpis[13])
event.set("Interaction", kpis[14])
event.set("correlationID", kpis[15])
'
}
}
output
{

}

You have already parsed the JSON with a json filter, so [message] is no longer JSON and that parse will throw an exception. Replace those two lines with

kpis = event.get("message").split("*")

Thanks. But that is what i tried yesterday based on your previous post. if i just use "
kpis = event.get("message").split("*")" then the entire JSON till the first '*' including values from other fields as below is set to message[0] which is an issue.

{
"timeMillis" : 1590564813191,
"thread" : "JOBSCHEDULER_THREAD_2",
"level" : "ALERT",
"loggerName" : "JobScheduler",
" message " : "value1

In one case you are parsing the JSON twice (so it fails because "2020-05-27 07:33:49,136 GMT8PEGA000510855..." is not a valid JSON object), in other case you are not parsing it at all. You need to parse it once.