Grok filter for log files in logstash

input {
  
  beats {
    port => 5044
  }
}

filter {
 if[log_type] =="access"{
    grok {
	match => {"message" => "%{COMBINEDAPACHELOG}"}
  } else if [log_type] == "errors" {
        grok {
            match => { "message" => "%{COMBINEDAPACHELOG}" }
        }
  }else [log_type] == "dispatcher" {
        grok {
            match => { "message" => "\A%{TIMESTAMP_ISO8601:timestamp}%{SPACE}\[%{DATA:threadId}]%{SPACE}%{LOGLEVEL:logLevel}%{SPACE}%{JAVACLASS:javaClass}%{SPACE}-%{SPACE}?(\[%{NONNEGINT:incidentId}])%{GREEDYDATA:message}" }
        }
    }
}
 
output {
    elasticsearch {
    hosts => ["localhost:9200"]
    sniffing => true
    manage_template => false
    ilm_enabled => false
    index    => "%{log_type}-%{+YYYY.MM.dd}"  
  }
  stdout {
    codec => rubydebug
  }
}

Below are the three log types I want to filter and extract information out of. Above is the file which has the match patterns.

Log Type 1: 08/10/2019 12:14:48 599   (null)                 DEBUG   27   GetUpdatedIncident for Incident Id 24749162 on thread

Log type 2: 08/10/2019 12:38:09 742   (null)                 DEBUG   10   Add activty in cache (152782646)

Log type 3: 2019-10-08 12:31:37,767 [pool-5-thread-47]      INFO   c.e.d.s.ScheduledActionProcessor - [24749750]EDI=NHA CustomFAULTSDEF: RR=NULL DispatchType=FLM RRDelay=0.0 RRThreshold=NULL DispatchWait=3 FaultSource=EMS HoldWhileServicing=false

I would like the Log type 3 to come out as below in kibana after the filter provided in filter block in logstash.conf but it isnt. Why?

{
  "threadId": "pool-5-thread-47",
  "logLevel": "INFO",
  "javaClass": "c.e.d.s.ScheduledActionProcessor",
  "incidentId": "24749750",
  "message": "EDI=NHA CustomFAULTSDEF: RR=NULL DispatchType=FLM RRDelay=0.0 RRThreshold=NULL DispatchWait=3 FaultSource=EMS HoldWhileServicing=false",
  "timestamp": "2019-10-08 12:31:37,767"
}

What does it come out like? What is the value of [log_type]? Is there a _grokparsefailure tag? Which index is the document in?

Hi @Badger
It comes out as this-

Dec 30, 2019 @ 12:39:28.646

agent.ephemeral_id:
    00463acf-e130-4092-bb39-55fe18dbceaa
agent.hostname:
    mehak-VirtualBox
agent.version:
    7.4.0
agent.type:
    filebeat
agent.id:
    bad135c8-d359-4936-b515-79eb4bb24630
@version:
    1
host.name:
    mehak-VirtualBox
ecs.version:
    1.1.0
log.offset:
    41,656,653
log.file.path:
    /home/mehak/Documents/filebeat-7.4.0-linux-x86_64/logs/log2.log
fields.log_type:
    access
@timestamp:
    Dec 30, 2019 @ 12:39:28.646
tags:
    beats_input_codec_plain_applied
message:
    ExpectedFixDT: 08/10/2019 16:07,
type:
    another_test
_id:
    MM-

and after adding message tag, and fields.log_type

Dec 30, 2019 @ 12:39:28.646  ExpectedFixDT: 08/10/2019 16:07,
access
Dec 30, 2019 @ 12:39:28.646  IncidentNo: 7928109745,
access
Dec 30, 2019 @ 12:39:28.647  ActivityDT: 08/10/2019 13:07,
access

The value of log_type is defined in filebeat.yml file as follows:

filebeat.inputs:
- 
  paths:
     - /home/mehak/Documents/filebeat-7.4.0-linux-x86_64/logs/log2.log
  enabled: true
  input_type: log
  fields:  
    log_type: access

-
  paths:
     - /home/mehak/Documents/filebeat-7.4.0-linux-x86_64/logs/logz.log
  enabled: true
  input_type: log
  fields:  
     log_type: errors

-
  paths:
     - /home/mehak/Documents/filebeat-7.4.0-linux-x86_64/logs/dispatcher-log.log
  enabled: true
  input_type: log
  fields:  
     log_type: dispatch
  
output.logstash:
  hosts: ["localhost:5044"]

There is no _grokparsefailure tag.

Do you mean where is it defined? Its is defined in logstash.conf

Your conditionals are testing a field called [log_type]. However, your events have

fields.log_type:
access

a field called [fields][log_type]. Either change the conditionals, or use the fields_under_root option in filebeat.

filter {
 if[fields][log_type] =="access"{
    grok {
	match => {"message" => "%{COMBINEDAPACHELOG}"}
  } else if [fields][log_type] == "errors" {
        grok {
            match => { "message" => "%{COMBINEDAPACHELOG}" }
        }
  }else [fields][log_type] == "dispatcher" {
        grok {
            match => { "message" => "\A%{TIMESTAMP_ISO8601:timestamp}%{SPACE}\[%{DATA:threadId}]%{SPACE}%{LOGLEVEL:logLevel}%{SPACE}%{JAVACLASS:javaClass}%{SPACE}-%{SPACE}?(\[%{NONNEGINT:incidentId}])%{GREEDYDATA:message}" }
        }
    }
}

Updated the [fields][log_type] in file and now the output is below for Timestamp, message, fields.log_type

Dec 30, 2019 @ 14:29:27.10508/10/2019 12:17:21 755   (null)                  INFO   24   Leftside Filter Expression : SubCategory="Servicing" for User ZKL15VT Item Count : 179
access

Dec 30, 2019 @ 14:29:27.11408/10/2019 12:17:22 012   (null)                  INFO   24   Leftside Filter Expression : SubCategory="Dispenser" AND SourceProblemName="Degraded" for User ZKL15VT Item Count : 63
access

But now errors and dispatcher logs arent showing up!

Hi @Badger, even after fixing the conditional below, the required text isnt filtered properly. Any suggestions?

@Badger,

Is this fields tag right or should I add the fields_under_root option too?

input {
  
  beats {
    port => 5044
  }
}

filter 
{
 if[fields][log_type] =="access"
  {
    grok 
    {
	match => {"message" => "%{DATESTAMP:timestamp} %{NONNEGINT:code} %{GREEDYDATA} %{LOGLEVEL} %{NONNEGINT:anum} %{GREEDYDATA} %{NONNEGINT:threadId}"}
    } 
  }else if [fields][log_type] == "errors" 
    {
        grok
        {
            match => { "message" => "%{DATESTAMP:timestamp} %{NONNEGINT:code} %{GREEDYDATA} %{LOGLEVEL} %{NONNEGINT:anum} %{GREEDYDATA:message}" }
        }
  }
  else if [fields][log_type] == "dispatch" 
  {
        grok 
        {
            match => { "message" => "\A%{TIMESTAMP_ISO8601:timestamp}%{SPACE}\[%{DATA:threadId}]%{SPACE}%{LOGLEVEL:logLevel}%{SPACE}%{JAVACLASS:javaClass}%{SPACE}-%{SPACE}?(\[%{NONNEGINT:incidentId}])%{GREEDYDATA:message}" }
        }
    }
}

output {
    elasticsearch {
    hosts => ["localhost:9200"]
    sniffing => true
    manage_template => false
    ilm_enabled => false
    index    => "%[fields][log_type]-%{+YYYY.MM.dd}"  
  }
  stdout {
    codec => rubydebug
  }
}

It is one or the other, it will not work if you do both.

I have not added the fields_under_root option and still the message isnt applying the grok pattern?
Thanks, @Badger

You need to look at the data. I suggest you add

output { stdout { codec => rubydebug } }

and see what a dispatch event looks like.

Is this not the same as you are suggesting? what is a dispatch event?

By dispatch event I mean one in which you expect [fields][log_type] to contain "dispatch".

currently, only access and error tagged logs are showing and not dispatch.

Also, I am considering adding multiple pipelines and make three config files with each file with its own filter and own output. https://www.elastic.co/guide/en/logstash/7.5/multiple-pipelines.html

@Badger, Made progress!!

after fixing the index error, Kibana actually enters the index name as

access-2020.01.10

which is how I wanted the index name to be.It is creating three different indexes now as expected. Thank you!Just needed to reload files

But the actual discussion is about grok pattern be used to filter logs which still isnt happening. Why is that happening any suggestions?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.