This is my config for logstash (filter file )
filter {
if "ETL_log_enriched" in [tags] {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:log_timestamp} (%{LOGLEVEL:loglevel}|%{WORD:loglevel}) (?[^]]+)- %{GREEDYDATA:msgbody}"
}
}
mutate
{
add_field => ["event_read_time", "%{@timestamp}"]
}
date {
match => ["log_timestamp", "yyyy-MM-dd HH:mm:ss"]
}
grok {
match => {
"msgbody" => "(?<log_category>\w+(?=-->))"
}
}
grok{
match => {
"msgbody" => "(?<log_value>(?<=-->)(.+))"
}
}
grok{
match => {
"msgbody" => "(?<Loggable_Message>(?<=ETL -)(.+))"
}
}
if [log_category] == "KPI" or [log_category] == "EXCEPTION" {
ruby {
code => "
log_value=event.get('log_value')
entities=log_value.split('||',5)
event.set('appName',entities[0])
event.set('repoName',entities[1])
event.set('resource_name',entities[2])
event.set('resource_type',entities[3])
kpi_list_string=entities[4]
kpi_list_string[0]=''
kpi_list_string[kpi_list_string.length-1]=''
kpi_list=kpi_list_string.split('||')
kpi_list.each do |kv_pair|
key=kv_pair.split(':',2)[0]
value=kv_pair.split(':',2)[1]
is_integer=value.match(/\A[+-]?\d+?(\.\d+)?\Z/) == nil ? false : true
if(is_integer==true)
value = value.to_i
end
event.set(key,value)
end
"
}
}
else if [log_category] == "STATUS"{
ruby {
code => "
log_value=event.get('log_value')
entities=log_value.split('||',4)
event.set('appName',entities[0])
event.set('repoName',entities[1])
event.set('resource_name',entities[2])
event.set('resource_status',entities[3])
"
}
else if [loglevel] == "KPI" or [loglevel] == "ERROR" or [loglevel] == "INFO" {
if [Loggable_Message] != "" {
ruby {
code => "
Loggable_Message=event.get('Loggable_Message')
entities=Loggable_Message.split('||',3)
event.set('appName',entities[1])
event.set('repoName',entities[0])
kpi_list_string=entities[2]
kpi_list_string[0]=''
kpi_list_string[kpi_list_string.length-1]=''
kpi_list=kpi_list_string.split('||')
kpi_list.each do |kv_pair|
key=kv_pair.split(':',2)[0]
value=kv_pair.split(':',2)[1]
is_integer=value.match(/\A[+-]?\d+?(\.\d+)?\Z/) == nil ? false : true
if(is_integer==true)
value = value.to_i
end
event.set(key,value)
end
"
}
}
}
mutate {
remove_field => ["log_value","msgbody","Loggable_Message]
}
}
}
Input (logs)
2019-06-24 14:54:20 INFO etl - KPI-->Spark ETL Pipeline For Unstructured Repo||PANGAEA||classification/PANGAEA/056050050048056056046065069065071078065080:PANGAEA.json||STORE||(timeTakenToStoreInS3:85)
2019-06-24 14:54:20 KPI ETL - effr||Spark ETL Pipeline For Unstructured Repo||ResourceType:PIPE||pipeName:storeClassification||run_time:96||containerS3Key:pangaea/2019-03-06/PANGAEA.880228/
I am getting Ruby exception occurred: undefined method `split' for nil:NilClass.