How to validate the data after parsing with KV Filter

I have log messages like:

2017-01-06 19:27:53,893 INFO [[ACTIVE] ExecuteThread: '10' for queue: 'weblogic.kernel.Default (self-tuning)'] com.symantec.olp.sas.log.SummaryLogAspect - {country=JP, svid=182, typ=sassum, vid=1120, xffmode=0, initallocdate=2014-12-15T01:08:24Z, xffip=222.230.107.165, seatcnt=1, plgid=2, api=941753644, oslocale=JPN, fng=CJ6FRE1208VMNRQG, req=/228228131/28RXAAPB1DqJj/RSLHL940/EMBXtu+/f+/Zeb/KV1Q/DTXZBFC94ZE5AOmz/mDCqB7zJOARDQO/166180202502162557303662649078783407201612&D09DEEFB7E78065D?NAM=SFlBUy1WQTE2&MFN=VkFJTyBDb3Jwb3JhdGlvbg==&MFM=VkpQMTEx&OLA=JPN&OLO=JPN, lpmv=470, oslang=JPN, ctok=166180202502162557303662649078783407201612, resptime=119, epid=70D3B811A994477F957A90985109BE9D, campnid=0, remip=222.230.107.165, lictype=SOS, dbepid=70D3B811A994477F957A90985109BE9D, cid=nav1sasapppex02.msp.symantec.com1481215212435, status=10002, siid=240, skum=21356539, skup=01001230, psn=O749UPCN8KSY, cip=84.100.138.144, mname=VAIO Corporation, puid=1199, skuf=01100470, st=1481765738387, prbid=5967, mmodel=VJP111, clang=EN, pnfi=1120, cprbid=745, cpmv=7428, euip=222.230.107.165, prcdline=2, dvnm=HYAS-VA16, remdays=0, seatid=ah00s8CIdqUQyW2V, sasvid=106, xlsid=3730, baseactkey=186635290403122706518307794, coupon=651218, translogid=75033f05-9cf2-48e2-b924-fc2441d11d33}
2017-01-06 19:28:03,894 INFO [[ACTIVE] ExecuteThread: '10' for queue: 'weblogic.kernel.Default (self-tuning)'] com.symantec.olp.sas.log.SummaryLogAspect - {country=JP, svid=182, typ=sassum, vid=1120, xffmode=0, initallocdate=2014-12-15T01:08:24Z, xffip=222.230.107.165, seatcnt=1, plgid=2, api=228228131, oslocale=JPN, fng=1TA6U8RVL0JQXA0N, req=/228228131/28RXAAPB1DqJj/RSLHL940/EMBXtu+/f+/Zeb/KV1Q/DTXZBFC94ZE5AOmz/mDCqB7zJOARDQO/166180202502162557303662649078783407201612&D09DEEFB7E78065D?NAM=SFlBUy1WQTE2&MFN=VkFJTyBDb3Jwb3JhdGlvbg==&MFM=VkpQMTEx&OLA=JPN&OLO=JPN, lpmv=470, oslang=JPN, ctok=166180202502162557303662649078783407201612, resptime=119, epid=70D3B811A994477F957A90985109BE9D, campnid=0, remip=222.230.107.165, lictype=SOS, dbepid=70D3B811A994477F957A90985109BE9D, cid=nav1sasapppex02.msp.symantec.com1481215212435, status=0000, siid=240, skum=21356539, skup=01001230, psn=28MHHH2VPR4T, cip=222.230.107.165, mname=VAIO Corporation, puid=1199, skuf=01100470, st=1481765738387, prbid=5967, mmodel=VJP111, clang=EN, pnfi=1120, cprbid=745, cpmv=1027, euip=222.230.107.165, prcdline=2, dvnm=HYAS-VA16, remdays=0, seatid=StrlisGXA4yAt1ad, sasvid=130, xlsid=2820, baseactkey=028200017462383754273799438, coupon=123456, translogid=72df4536-6038-4d1c-b213-d0ff5c3c20fb}

I use the below grok pattern to match against these:

(?m)%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:severity} \[%{GREEDYDATA:thread}\] %{JAVACLASS:className} - \{%{GREEDYDATA:logmsg}\}

Post that, I use the KV filter to split the fields in logmsg field and include only those fields that are of interest to me. My question is: How can I validate the format of those fields that are of interest to me? One thing I need to mention is - the log contains different numbers of fields in logmsg that's why I've used GREEDYDATA

My Logstash.conf is as follows:

input {
  kafka {
    bootstrap_servers => "brokers_list"
    topics => ["transaction-log"]
    codec => "json"
  }
}

filter {
        grok {
            match => [ "message", "(?m)%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:severity} \[%{GREEDYDATA:thread}\] %{JAVACLASS:className} - \{%{GREEDYDATA:logmsg}\}" ]
            #overwrite => [ "message" ]
        }
        
        if "_grokparsefailure" not in [tags] {
           kv {
              field_split => ", "
              source => "logmsg"
              include_keys => ["api", "fng", "status", "cip", "cpmv", "translogid", "coupon", "baseactkey", "xlsid", "sasvid", "seatid", "srcHostname", "serverId" ]
              allow_duplicate_values => false
              remove_field => [ "message", "kafka.*", "logmsg"]
           }
        }

        if [api] != "228228131" {
           mutate { add_tag => "_grokparsefailure" }
        }

        date { # use timestamp from the log
          "match" => [ "timestamp", "YYYY-MM-dd HH:mm:ss,SSS" ]
          target => "@timestamp"
        }

        mutate {
          remove_field => [ "timestamp" ]  # remove unused stuff
        }
}

output {
  if "_grokparsefailure" not in [tags] {
	  kafka {
	    topic_id => "invalid topic"
	    bootstrap_servers => "brokers_list"
	    codec => json {}
	  }
   } else {
	  kafka {
	    topic_id => "valid topic"
	    bootstrap_servers => "brokers_list"
	    codec => json { }
	  }
   }
}

After parsing with KV filter, I'm checking the value of api field and if it is NOT equal to 228228131, then I add _grokparsefailure tag to it and don't process further.

I want to be able to validate the format of the fields listed in Include_keys like cip which is client IP? How can I validate the data format for those fields? Since my log contains different number of fields, at the grok level I can't validate. Only after parsing with KV I can get those fields and validate them. By validation i mean, validate that they conform to the type defined in ES Index. This is because in case, they are not conforming, I want to send them to invalid-topic in Kafka.

Should I use ruby filter to validate? If so, can you give me a sample? Or should I rebuild the event after KV parsing and again use grok on that newly created event.

Will v much appreciate some sample showing these.

@magnusbaeck Any suggestions please?

Logstash is not that great at validation.

The best thing I can think of would be to use date/ip filters to see if they can process the field, then conditional match them if they don't.

Please don't ping people like that, unless they are already in the topic answering. If you'd like guaranteed answers then there are support subscriptions, otherwise the time most of us spend here is voluntary so we ask that you respect that :slight_smile:

My sincerest apologies, @warkolm. Thanks for letting me know. We do have support subscription but it was Friday :(. I have been at it since so long and I thought if I could get the answer, I could try it out over the weekend. Thank you.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.