Multiple Grok pattern filters arent filtering multiple logs in one logstash file

Below is my pipeline.conf where I want the filter block to apply three separate grok patterns on three different log files. But currently, it isnt working. Should I add multiple pipelines to it? or create three different config files with one filter pattern each? Or is there another way to it?

input {
  
  beats {
    port => 5044
  }
}

filter 
{
 if[fields][log_type] =="access"
  {
    grok 
    {
	match => {"message" => "%{DATESTAMP:timestamp} %{NONNEGINT:code} %{GREEDYDATA} %{LOGLEVEL} %{NONNEGINT:anum} %{GREEDYDATA} %{NONNEGINT:threadId}"}
    } 
  }else if [fields][log_type] == "errors" 
    {
        grok
        {
            match => { "message" => "%{DATESTAMP:timestamp} %{NONNEGINT:code} %{GREEDYDATA} %{LOGLEVEL} %{NONNEGINT:anum} %{GREEDYDATA:message}" }
        }
  }
  else if [fields][log_type] == "dispatch" 
  {
        grok 
        {
            match => { "message" => "\A%{TIMESTAMP_ISO8601:timestamp}%{SPACE}\[%{DATA:threadId}]%{SPACE}%{LOGLEVEL:logLevel}%{SPACE}%{JAVACLASS:javaClass}%{SPACE}-%{SPACE}?(\[%{NONNEGINT:incidentId}])%{GREEDYDATA:message}" }
        }
    }
}

output {
    elasticsearch {
    hosts => ["localhost:9200"]
    sniffing => true
    manage_template => false
    ilm_enabled => false
    index    => "%{[fields][log_type]}-%{+YYYY.MM.dd}"  
  }
  stdout {
    codec => rubydebug
  }
}

@Mehak_Bhargava

Please share the sample data.

Log for 1st grok-

08/10/2019 12:14:48 599   (null)                 DEBUG   27   GetUpdatedIncident for Incident Id 24749162 on thread 04fd1833-8275-46ff-816f-9acf0c1f7724:80759 on Thread 27
08/10/2019 12:14:48 600   (null)                 DEBUG   19   Updating cache with activity (152775689) Add Item:True Modify Item: False
08/10/2019 12:14:48 601   (null)                 DEBUG   67   Applying dynamic filter

Log file 2nd grok-

09/10/2019 12:38:09 741   (null)                 DEBUG   61   Filter : 
08/10/2019 12:38:09 742   (null)                 DEBUG   10   Add activty in cache (152782646)
08/10/2019 12:38:09 758   (null)                 DEBUG   10   Add incident activity to cache
08/10/2019 12:38:09 774   (null)                 DEBUG   61   Leftside Filter Expression : IncidentType=1 AND StatusCode="01" for User ZKH481F

Right now message without filter is showing as -

08/10/2019 12:38:11 268   (null)                  INFO   62   Leftside Filter Expression : SubCategory="Cash Management" AND SourceProblemName="AOC ATM is Out of Cash per Servicer" for User NBK22RA Item Count : 2

Whereas I want the filter to be applied and show this in mesage on kibana-

{
  "code": "774",
  "anum": "61",
  "StatusCode": "01",
  "timestamp": "08/10/2019 12:38:09"
}

Log for 3rd grok-

2019-10-08 12:31:37,767 [pool-5-thread-47]      INFO   c.e.d.s.ScheduledActionProcessor - [24749750]EDI=NHA CustomFAULTSDEF: RR=NULL DispatchType=FLM RRDelay=0.0 RRThreshold=NULL DispatchWait=3 FaultSource=EMS HoldWhileServicing=false
2019-10-08 12:31:37,787 [pool-5-thread-77]      INFO   c.e.d.s.ScheduledActionProcessor - [24749536]QueryResult : {"partyDetails":[{"duration":{"baseValueMinutes":0},"template":{"id":10,"name":"EDI Template","description":"EDI Template","templateType":"other","jsonDefinition":"{\"body\":\"{\\\"MaxRetries\\\" : \\\"2\\\",\\\"Ttl\\\" : \\\"3600\\\"}\"}","tenantId":100,"channelId":77},"contactChannel":"EDI","atmSchedule":"AnyHours","level":"B. Service Team","contactType":"Notification","waitForNextContact":false,"contactMapping":"28-29-77-47-0","lifeCycle":"Close","users":[{"securityGroupName":null,"template":{"id":10,"name":"EDI Template","description":"EDI Template","templateType":"other","jsonDefinition":"{\"body\":\"{\\\"MaxRetries\\\" : \\\"2\\\",\\\"Ttl\\\" : \\\"3600\\\"}\"}","tenantId":100,"channelId":77},"lastName":null,"isAvailable":false,"address":null,"orgName":null,"nextAvailableTime":null,"timeZone":null,"userName":null,"userId":null,"orgId":"","firstName":null,"sequenceNo":2}]}]}
2019-10-08 12:31:37,774 [pool-5-thread-96]      INFO   c.e.d.s.s.SqlAdapterImpl - executeInternalAll;

Is this from logstash log?

I need the data that you are receiving from filebeat not the logstash output.

@mancharagopan, this is the sample data that is in the files that filebeat is receiving.
Below is my filebeat.yml-

filebeat.inputs:
- 
  paths:
     - /home/mehak/Documents/filebeat-7.4.0-linux-x86_64/logs/log2.log
  enabled: true
  input_type: log
  fields:  
    log_type: access

-
  paths:
     - /home/mehak/Documents/filebeat-7.4.0-linux-x86_64/logs/logz.log
  enabled: true
  input_type: log
  fields:  
     log_type: errors

-
  paths:
     - /home/mehak/Documents/filebeat-7.4.0-linux-x86_64/logs/dispatcher-log.log
  enabled: true
  input_type: log
  fields:  
     log_type: dispatch
  
output.logstash:
  hosts: ["localhost:5044"]

This is filebeat configuration. I need the data in one of the log file.

You sure your problem is not the right spacing in your syntax? For example the missing space between the if and the bracket?

Because trying to replicate your scenario (on a 7.0.0 stack version) and having:

filebeat.yml

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - .../access.log
  fields: 
    log_type: access

- type: log
  enabled: true
  paths:
    - .../errors.log
  fields: 
    log_type: errors

logstash.conf

input {
  beats {
    port => 5044
  }
}

filter {
  if [fields][log_type] == "access" {
    mutate {
      add_field => { "custom_type" => "access" }
    }
  } else if [fields][log_type] == "errors" {
    mutate {
      add_field => { "custom_type" => "errors" }
    }
  }
}

output { stdout{} }

access.log
08/10/2019 12:14:48 601 (null) DEBUG 67 Applying dynamic filter

errors.log
08/10/2019 12:38:09 742 (null) ERRORS 10 Add activty in cache (152782646)

I do have the right custom_type field in each of the log.
Can you post here the Logstash output without any filter? Simply taking the data from the beats input?

P.S. alternatively try something like if "access" in [fields][log_type]

It worked with this config-

input {
  
  beats {
    port => 5044
  }
}

filter {
  if[fields][log_type] =="access" {
    grok {
      break_on_match => false
      match => {
        "message" => [
          "%{DATESTAMP:timestamp}%{SPACE}%{NONNEGINT:code}%{GREEDYDATA}%{LOGLEVEL}%{SPACE}%{NONNEGINT:anum}%{SPACE}%{GREEDYDATA:logmessage}",
          "(?<activityId>(?<=activity\s\()\d+)"
        ]
      }
    }
  } else if [fields][log_type] == "errors" {
    grok {
      break_on_match => false
      match => {
        "message" => [
          "%{DATESTAMP:timestamp}%{SPACE}%{NONNEGINT:code}%{GREEDYDATA}%{LOGLEVEL}%{SPACE}%{NONNEGINT:anum}%{SPACE}%{GREEDYDATA:logmessage}",
          "(?<statusCode>(?<=StatusCode=\")\d+)"
        ]
      }
    }
  } else if [fields][log_type] == "dispatch" {
    grok {
      break_on_match => false
      match => {
        "message" => [
          "\A%{TIMESTAMP_ISO8601:timestamp}%{SPACE}\[%{DATA:threadId}]%{SPACE}%{LOGLEVEL:logLevel}%{SPACE}%{JAVACLASS:javaClass}%{SPACE}-%{SPACE}(\[%{NONNEGINT:incidentId}])?%{GREEDYDATA:message}",
          "(?<scheduledActionList>(?<=scheduledActionList\s\[)[\d,\s]+)"
        ]
      }
    }
    if "" in [scheduledActionList] {
      mutate {
        gsub => ["scheduledActionList", " ", ""]
        split => {"scheduledActionList" => ","}
      }
    }
  }
}



output {
    elasticsearch {
    hosts => ["localhost:9200"]
    sniffing => true
    manage_template => false
    ilm_enabled => false
    index    => "%{[fields][log_type]}-%{+YYYY.MM.dd}"  
  }
  stdout {
    codec => rubydebug
  }
}