This is my config for logstash (filter file )
filter {
if "ETL_log_enriched"  in [tags] {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:log_timestamp} (%{LOGLEVEL:loglevel}|%{WORD:loglevel}) (?[^]]+)- %{GREEDYDATA:msgbody}"
}
}
mutate
{
add_field => ["event_read_time", "%{@timestamp}"]
}
date {
match => ["log_timestamp", "yyyy-MM-dd HH:mm:ss"]
}
grok {
match => {
"msgbody" => "(?<log_category>\w+(?=-->))"
}
}
grok{
match => {
"msgbody" => "(?<log_value>(?<=-->)(.+))"
}
}
grok{
match => {
"msgbody" => "(?<Loggable_Message>(?<=ETL -)(.+))"
}
}
    if [log_category] == "KPI" or [log_category] == "EXCEPTION" {
        ruby {
            code => "
                log_value=event.get('log_value')
                entities=log_value.split('||',5)
                event.set('appName',entities[0])
                event.set('repoName',entities[1])
                event.set('resource_name',entities[2])
                event.set('resource_type',entities[3])
                kpi_list_string=entities[4]
                kpi_list_string[0]=''
                kpi_list_string[kpi_list_string.length-1]=''
                kpi_list=kpi_list_string.split('||')
                kpi_list.each do |kv_pair|
                    key=kv_pair.split(':',2)[0]
                    value=kv_pair.split(':',2)[1]
                    is_integer=value.match(/\A[+-]?\d+?(\.\d+)?\Z/) == nil ? false : true
                    if(is_integer==true)
                        value = value.to_i
                    end
                    event.set(key,value)
                end
            "
        }
    } 
    else if [log_category] == "STATUS"{
        ruby {
            code => "
                log_value=event.get('log_value')
                entities=log_value.split('||',4)
                event.set('appName',entities[0])
                event.set('repoName',entities[1])
                event.set('resource_name',entities[2])
                event.set('resource_status',entities[3])
            "
        }
    else if [loglevel] == "KPI" or [loglevel] == "ERROR" or [loglevel] == "INFO" {
    if [Loggable_Message] != "" {
        ruby {
            code => "
                Loggable_Message=event.get('Loggable_Message')
                entities=Loggable_Message.split('||',3)
                event.set('appName',entities[1])
                event.set('repoName',entities[0])
                kpi_list_string=entities[2]
                kpi_list_string[0]=''
                kpi_list_string[kpi_list_string.length-1]=''
                kpi_list=kpi_list_string.split('||')
                kpi_list.each do |kv_pair|
                    key=kv_pair.split(':',2)[0]
                    value=kv_pair.split(':',2)[1]
                    is_integer=value.match(/\A[+-]?\d+?(\.\d+)?\Z/) == nil ? false : true
                    if(is_integer==true)
                        value = value.to_i
                    end
                    event.set(key,value)
                end
            "
           }
       }
    }
    mutate {
        remove_field => ["log_value","msgbody","Loggable_Message]
    }
}
}
Input (logs)
2019-06-24 14:54:20 INFO  etl - KPI-->Spark ETL Pipeline For Unstructured Repo||PANGAEA||classification/PANGAEA/056050050048056056046065069065071078065080:PANGAEA.json||STORE||(timeTakenToStoreInS3:85)
2019-06-24 14:54:20 KPI   ETL - effr||Spark ETL Pipeline For Unstructured Repo||ResourceType:PIPE||pipeName:storeClassification||run_time:96||containerS3Key:pangaea/2019-03-06/PANGAEA.880228/
I am getting Ruby exception occurred: undefined method `split' for nil:NilClass.