What is the grok pattern that can match variable length documents from a single log file?

I am having records/documents of variable lengths, say 3 types of logs coming in a single log fie.

What is the grok pattern that can match variable length documents from a single log file?

This grok pattern can be used in the kibana interface for creating index and ingest pattern using the 'upload a sample file' feature provided by kibana.

regards
shiny

You can have multiple patterns as a list in a grok. If you set break_on_match => true it will exit after the first one that matches. Then you just need to construct each pattern so it will only match one of the line types.

Sir,

Thanks for the suggestion,

i modified the grok in Ingest/pipeline to include the 2 patterns the log file was containing, and it worked. it accepted all entries in the log file from filebeat. i am not using logtash,
but where to apply "break_on_match => true " couldnt find any similar one in Ingest/pipeline syntax.

the followinng is the ingest/pipelne modification i made. hope it is ok

PUT _ingest/pipeline/lp_index-fg-email1test-pipeline
{
"description": "Ingest pipeline created by text structure finder",
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{TIMESTAMP_ISO8601:timestamp} _gateway date=(?%{YEAR}[/-]%{MONTHNUM}[/-]%{MONTHDAY}) time=(?%{INT}:%{INT}:%{INT}) devname=%{QUOTEDSTRING:devname} devid=%{QUOTEDSTRING:devid} eventtime=%{NUMBER:eventtime} tz=%{QUOTEDSTRING:tz} logid=%{QUOTEDSTRING:logid} type=%{QUOTEDSTRING:type} subtype=%{QUOTEDSTRING:subtype} eventtype=%{QUOTEDSTRING:eventtype} level=%{QUOTEDSTRING:level} vd=%{QUOTEDSTRING:vd} policyid=%{NUMBER:policyid} sessionid=%{NUMBER:sessionid} srcip=%{IPV4:srcip} srcport=%{NUMBER:srcport} srcintf=%{QUOTEDSTRING:srcintf} srcintfrole=%{QUOTEDSTRING:srcinfrole} dstip=%{IPV4:dstip} dstport=%{NUMBER:dstport} dstintf=%{QUOTEDSTRING:dstintf} dstintfrole=%{QUOTEDSTRING:dstintfrole} proto=%{NUMBER:proto}",
"%{TIMESTAMP_ISO8601:timestamp} _gateway date=(?%{YEAR}[/-]%{MONTHNUM}[/-]%{MONTHDAY}) time=(?%{INT}:%{INT}:%{INT}) devname=%{QUOTEDSTRING:devname} devid=%{QUOTEDSTRING:devid} eventtime=%{NUMBER:eventtime} tz=%{QUOTEDSTRING:tz} logid=%{QUOTEDSTRING:logid} type=%{QUOTEDSTRING:type} subtype=%{QUOTEDSTRING:subtype} eventtype=%{QUOTEDSTRING:eventtype} level=%{QUOTEDSTRING:level} vd=%{QUOTEDSTRING:vd} policyid=%{NUMBER:policyid} sessionid=%{NUMBER:sessionid} srcip=%{IPV4:srcip} srcport=%{NUMBER:srcport} srcintf=%{QUOTEDSTRING:srcintf} srcintfrole=%{QUOTEDSTRING:srcinfrole} dstip=%{IPV4:dstip} dstport=%{NUMBER:dstport} dstintf=%{QUOTEDSTRING:dstintf} dstintfrole=%{QUOTEDSTRING:dstintfrole} proto=%{NUMBER:proto} from=%{QUOTEDSTRING:from} to=%{QUOTEDSTRING:to} sender=%{QUOTEDSTRING:sender} recipient=%{QUOTEDSTRING:recipient} direction=%{QUOTEDSTRING:direction} msg=%{QUOTEDSTRING:msg} subject=%{QUOTEDSTRING:subject} size=%{QUOTEDSTRING:size} attachment=%{QUOTEDSTRING:attachment}"
]

  }
},
{
  "date": {
    "field": "timestamp",
    "formats": [
      "ISO8601"
    ],
    "output_format": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSXXX"
  }
},
{
  "convert": {
    "field": "dstport",
    "type": "long",
    "ignore_missing": true
  }
},
{
  "convert": {
    "field": "eventtime",
    "type": "long",
    "ignore_missing": true
  }
},
{
  "convert": {
    "field": "policyid",
    "type": "long",
    "ignore_missing": true
  }
},
{
  "convert": {
    "field": "proto",
    "type": "long",
    "ignore_missing": true
  }
},
{
  "convert": {
    "field": "sessionid",
    "type": "long",
    "ignore_missing": true
  }
},
{
  "convert": {
    "field": "srcport",
    "type": "long",
    "ignore_missing": true
  }
},
{
  "remove": {
    "field": "timestamp"
  }
},
{
  "remove": {
    "field": "tz"
  }
}

]
}

thanks
shini

Just a tip, you do not need grok to parse a message like this, from what you shared it seems to be a Fortigate log event, which is a message with a key-value pairs.

Looking at your grok pattern you could combine everything with a dissect processor and kv processor.

Your dissect processor could be something like this:

{
  "dissect": {
    "field": "message",
    "pattern" : "%{timestamp} _gateway %{kvmsg}"
   }
}

Then, after that processor, you could use the following kv processor:

{
  "kv": {
    "field": "kvmsg",
    "field_split": " ",
    "value_split": "="
  }
}

Those processor would do the same as your grok processor, but would use less CPU.

Hi Shini,
I was referring to the logstash syntax

However looking at the docs for Elasticsearch it seems that it always breaks on the first match so the parameter in is unnecessary.
Grok processor | Elasticsearch Guide [master] | Elastic.

I suppose it you wanted them all to match you would have to add them as separate processors

Stuart

sir,

This is a great idea, Thanks a lot , i am just a beginner here
As you told I am trying to handle fortinet logs.

I removed the grok processor and added the dissect and kv processors. for the ingest node

but nothing gets logged in the index(I didnt change the index)
Can you plz explain as how to trobleshoot. My ingest node simulate syntax is not working . It gives error for "field": "message" itself

thanks and regards
shini

Sir,
Thank you for the reply
As you told it seems that it always breaks on the first match and all log entries are indexed correctly. but as suggested in the earlier reply separate processors for dissect and kv can work better.

For the sample .log file

2021-09-27T03:35:53.263029+05:30 _gateway devname="PPFW01"
2021-09-27T03:42:22.549689+05:30 _gateway devname="PPFW02"
2021-09-27T04:02:36.089881+05:30 _gateway devname="PPFW03"
2021-09-27T04:05:06.749370+05:30 _gateway devname="PPFW04"
2021-09-27T04:10:04.904638+05:30 _gateway devname="PPFW05"
2021-09-27T04:03:18.223319+05:30 _gateway devname="PPFW06" from="aditya@gmail.com" attachment="yes"
2021-09-27T04:10:26.326831+05:30 _gateway devname="PPFW02" from="contact@online.com" attachment="no"
============Following grok expression was used while creating the index========
%{TIMESTAMP_ISO8601:timestamp} _gateway devname=%{QUOTEDSTRING:devname}.*

===================================
the following ingest pipe line was used matched all 7 records of the log file

PUT _ingest/pipeline/lp_indexttt-pipeline
{
"description": "Ingest pipeline created by text structure finder",
"processors": [
{
"dissect": {
"field": "message",
"pattern" : "%{timestamp} _gateway %{kvmsg}"
}
},
{
"kv": {
"field": "kvmsg",
"field_split": " ",
"value_split": "="
}
},
{
"date": {
"field": "timestamp",
"formats": [
"ISO8601"
],
"output_format": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSXXX"
}
},
{
"remove": {
"field": "timestamp"
}
}
]
}

============================

But it didnt show the additional fields(from and attachment , ie, from="aditya@gmail.com" attachment="yes" ) whlle viewing the index using discover, even though it showed all 7 rows

Why is it so ?

Sir,
Actually all fields were viewable, only when i changed the options in the following way

Under Kibana- Discover- after selecting the index pattern, for the field names - Filter by type - The options " Aggregatable = yes" and " Searchable = yes" was selected .
Then the fields from ([from="aditya@gmail.com")and attachment (attachment="no")were NOT listed

Only when the options " Aggregatable = any" and Searchable = any " was selected ,
Then the fields from ([from="aditya@gmail.com)and attachment (attachment="no") got listed

Now the issue is HOW to make these fields come under Aggregatable = yes" and "Searchable = yes" ?
thanks for the support
shini

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.