I have log messages like:
2017-01-06 19:27:53,893 INFO [[ACTIVE] ExecuteThread: '10' for queue: 'weblogic.kernel.Default (self-tuning)'] com.symantec.olp.sas.log.SummaryLogAspect - {country=JP, svid=182, typ=sassum, vid=1120, xffmode=0, initallocdate=2014-12-15T01:08:24Z, xffip=222.230.107.165, seatcnt=1, plgid=2, api=941753644, oslocale=JPN, fng=CJ6FRE1208VMNRQG, req=/228228131/28RXAAPB1DqJj/RSLHL940/EMBXtu+/f+/Zeb/KV1Q/DTXZBFC94ZE5AOmz/mDCqB7zJOARDQO/166180202502162557303662649078783407201612&D09DEEFB7E78065D?NAM=SFlBUy1WQTE2&MFN=VkFJTyBDb3Jwb3JhdGlvbg==&MFM=VkpQMTEx&OLA=JPN&OLO=JPN, lpmv=470, oslang=JPN, ctok=166180202502162557303662649078783407201612, resptime=119, epid=70D3B811A994477F957A90985109BE9D, campnid=0, remip=222.230.107.165, lictype=SOS, dbepid=70D3B811A994477F957A90985109BE9D, cid=nav1sasapppex02.msp.symantec.com1481215212435, status=10002, siid=240, skum=21356539, skup=01001230, psn=O749UPCN8KSY, cip=84.100.138.144, mname=VAIO Corporation, puid=1199, skuf=01100470, st=1481765738387, prbid=5967, mmodel=VJP111, clang=EN, pnfi=1120, cprbid=745, cpmv=7428, euip=222.230.107.165, prcdline=2, dvnm=HYAS-VA16, remdays=0, seatid=ah00s8CIdqUQyW2V, sasvid=106, xlsid=3730, baseactkey=186635290403122706518307794, coupon=651218, translogid=75033f05-9cf2-48e2-b924-fc2441d11d33}
2017-01-06 19:28:03,894 INFO [[ACTIVE] ExecuteThread: '10' for queue: 'weblogic.kernel.Default (self-tuning)'] com.symantec.olp.sas.log.SummaryLogAspect - {country=JP, svid=182, typ=sassum, vid=1120, xffmode=0, initallocdate=2014-12-15T01:08:24Z, xffip=222.230.107.165, seatcnt=1, plgid=2, api=228228131, oslocale=JPN, fng=1TA6U8RVL0JQXA0N, req=/228228131/28RXAAPB1DqJj/RSLHL940/EMBXtu+/f+/Zeb/KV1Q/DTXZBFC94ZE5AOmz/mDCqB7zJOARDQO/166180202502162557303662649078783407201612&D09DEEFB7E78065D?NAM=SFlBUy1WQTE2&MFN=VkFJTyBDb3Jwb3JhdGlvbg==&MFM=VkpQMTEx&OLA=JPN&OLO=JPN, lpmv=470, oslang=JPN, ctok=166180202502162557303662649078783407201612, resptime=119, epid=70D3B811A994477F957A90985109BE9D, campnid=0, remip=222.230.107.165, lictype=SOS, dbepid=70D3B811A994477F957A90985109BE9D, cid=nav1sasapppex02.msp.symantec.com1481215212435, status=0000, siid=240, skum=21356539, skup=01001230, psn=28MHHH2VPR4T, cip=222.230.107.165, mname=VAIO Corporation, puid=1199, skuf=01100470, st=1481765738387, prbid=5967, mmodel=VJP111, clang=EN, pnfi=1120, cprbid=745, cpmv=1027, euip=222.230.107.165, prcdline=2, dvnm=HYAS-VA16, remdays=0, seatid=StrlisGXA4yAt1ad, sasvid=130, xlsid=2820, baseactkey=028200017462383754273799438, coupon=123456, translogid=72df4536-6038-4d1c-b213-d0ff5c3c20fb}
I use the below grok pattern to match against these:
(?m)%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:severity} \[%{GREEDYDATA:thread}\] %{JAVACLASS:className} - \{%{GREEDYDATA:logmsg}\}
Post that, I use the KV filter to split the fields in logmsg field and include only those fields that are of interest to me. My question is: How can I validate the format of those fields that are of interest to me? One thing I need to mention is - the log contains different numbers of fields in logmsg that's why I've used GREEDYDATA
My Logstash.conf is as follows:
input {
kafka {
bootstrap_servers => "brokers_list"
topics => ["transaction-log"]
codec => "json"
}
}
filter {
grok {
match => [ "message", "(?m)%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:severity} \[%{GREEDYDATA:thread}\] %{JAVACLASS:className} - \{%{GREEDYDATA:logmsg}\}" ]
#overwrite => [ "message" ]
}
if "_grokparsefailure" not in [tags] {
kv {
field_split => ", "
source => "logmsg"
include_keys => ["api", "fng", "status", "cip", "cpmv", "translogid", "coupon", "baseactkey", "xlsid", "sasvid", "seatid", "srcHostname", "serverId" ]
allow_duplicate_values => false
remove_field => [ "message", "kafka.*", "logmsg"]
}
}
if [api] != "228228131" {
mutate { add_tag => "_grokparsefailure" }
}
date { # use timestamp from the log
"match" => [ "timestamp", "YYYY-MM-dd HH:mm:ss,SSS" ]
target => "@timestamp"
}
mutate {
remove_field => [ "timestamp" ] # remove unused stuff
}
}
output {
if "_grokparsefailure" not in [tags] {
kafka {
topic_id => "invalid topic"
bootstrap_servers => "brokers_list"
codec => json {}
}
} else {
kafka {
topic_id => "valid topic"
bootstrap_servers => "brokers_list"
codec => json { }
}
}
}
After parsing with KV
filter, I'm checking the value of api
field and if it is NOT equal to 228228131
, then I add _grokparsefailure tag to it and don't process further.
I want to be able to validate the format of the fields listed in Include_keys like cip
which is client IP? How can I validate the data format for those fields? Since my log contains different number of fields, at the grok level I can't validate. Only after parsing with KV I can get those fields and validate them. By validation i mean, validate that they conform to the type defined in ES Index. This is because in case, they are not conforming, I want to send them to invalid-topic in Kafka.
Should I use ruby filter to validate? If so, can you give me a sample? Or should I rebuild the event after KV parsing and again use grok on that newly created event.
Will v much appreciate some sample showing these.