Grok filter not being applied but logstash logs being collected

I am passing logs from server to Elasticsearch and they are indexed properly but the grok filter I have applied in not extracting the information. The grok pattern works perfectly when i use it in grok debugger against the log line, just not in pipeline.conf file. Why?

input {
  
  beats {
    port => 5044
  }
}

filter {
  if[fields][tags] =="ngta-web" {
    grok {
      break_on_match => false
      match => {
	"message" => [
	   "%{DATESTAMP:timestamp}%{SPACE}%{NONNEGINT:code}%{GREEDYDATA}%{LOGLEVEL}%{SPACE}%{NONNEGINT:anum}%{SPACE}%{GREEDYDATA:logmessage}"
         ]
      }
    }
  }
  else if [fields][tags] == "ngta-app" {
    grok {
      break_on_match => false
      match => { 
	"message" => [
	   "%{DATESTAMP:timestamp}%{SPACE}%{NONNEGINT:code}%{GREEDYDATA}%{LOGLEVEL}%{SPACE}%{NONNEGINT:anum}%{SPACE}%{GREEDYDATA:logmessage}" 
 	]
      }
    }
  }
  else if [fields][tags] == "monitoring-app" {
     grok {
        break_on_match => false
        match => {
	   "message" => [
		"%{DATESTAMP:timestamp}%{SPACE}%{NONNEGINT:code}%{GREEDYDATA}%{LOGLEVEL}%{SPACE}%{NONNEGINT:anum}%{SPACE}%{GREEDYDATA:logmessage}"
	    ] 
        }
     }
  }
}

output {
    elasticsearch {
    hosts => ["localhost:9200"]
    sniffing => true
    manage_template => false
    ilm_enabled => false
    index    => "%{[fields][tags]}"  
  }
  stdout {
    codec => rubydebug
  }
}


Hi there,

can you please try to add any other filter (an add_field filter with a dummy value inside) in all of the if-else if clauses? Just to see if you are really entering those blocks.

@Fabio-sama
I tried to add the 'add_filter' in code as below but logstash threw an error and didnt run the add_filter.

filter {
  if[fields][tags] =="ngta-web" { 
  add_field => { "host" => "%{[event_data][IpAddress]}
    grok {
      break_on_match => false
      match => {
	"message" => [
	   "%{DATESTAMP:timestamp}%{SPACE}%{GREEDYDATA}%{SPACE}%{LOGLEVEL:loglevel}%{SPACE}%{JAVACLASS:javaClass}%{GREEDYDATA:logmessage}"
         ]
      }
    }
  }

Error is-

Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:test, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, { at line 10, column 13 (byte 123) after filter {\n  if[fields][tags] ==\"ngta-web\" { \n  add_field ", :backtrace=>["/home/mehak/Documents/logstash-7.4.0/logstash-core/lib/logstash/compiler.rb:41:in `compile_imperative'", "/home/mehak/Documents/logstash-7.4.0/logstash-core/lib/logstash/compiler.rb:49:in `compile_graph'", "/home/mehak/Documents/logstash-7.4.0/logstash-core/lib/logstash/compiler.rb:11:in `block in compile_sources'", "org/jruby/RubyArray.java:2584:in `map'", "/home/mehak/Documents/logstash-7.4.0/logstash-core/lib/logstash/compiler.rb:10:in `compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:153:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:47:in `initialize'", "/home/mehak/Documents/logstash-7.4.0/logstash-core/lib/logstash/java_pipeline.rb:26:in `initialize'", "/home/mehak/Documents/logstash-7.4.0/logstash-core/lib/logstash/pipeline_action/create.rb:36:in `execute'", "/home/mehak/Documents/logstash-7.4.0/logstash-core/lib/logstash/agent.rb:326:in `block in converge_state'"]}

That's because the add_field filter has to be inside a mutate filter. Check the docs for the correct syntax and try again :wink:

@Fabio-sama Got it. yes, now I added the add_field and in kibana page, dont see host added. Seems these blocks arent entered. What is the solution of it?

filter {
  if[fields][tags] =="ngta-web" { 
    mutate {
      add_field => [ "[host]", "%{[event_data][IpAddress]}" ]
    }
    grok {
      break_on_match => false
      match => {
	"message" => [
	   "%{DATESTAMP:timestamp}%{SPACE}%{GREEDYDATA}%{SPACE}%{LOGLEVEL:loglevel}%{SPACE}%{JAVACLASS:javaClass}%{GREEDYDATA:logmessage}"
         ]
      }
    }
  }

Ok so, in order to see what Logstash is receiving (and parsing) and why it doesn't enter that blocks, can you please post here what is printed in stdout if you do not apply any filter? Like running this pipeline:

input {
  beats {
    port => 5044
  }
}

filter {}

output {
  stdout {}
}

@Fabio-sama,

I ran this pipeline and in logstash terminal I see this data printed in loop.

},
       "message" => "08/10/2019 12:22:21 291   (null)                  INFO   68   Leftside Filter Expression : SubCategory=\"ANS Management\" AND SourceProblemName=\"8893 Initialize Mass Hamiliton Keys\" for User ZK0DUBO Item Count : 2",
         "agent" => {
        "ephemeral_id" => "623c4d83-8ea9-4456-b9b6-98bf9da622fb",
            "hostname" => "mehak-VirtualBox",
                "type" => "filebeat",
             "version" => "7.4.0",
                  "id" => "bad135c8-d359-4936-b515-79eb4bb24630"
    },
           "ecs" => {
        "version" => "1.1.0"
    }
}
{
          "host" => {
        "name" => "mehak-VirtualBox"
    },
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
      "@version" => "1",
    "@timestamp" => 2020-02-29T02:49:47.330Z,
        "fields" => {
        "log_type" => "trial_index"
    },
           "log" => {
          "file" => {
            "path" => "/home/mehak/Documents/filebeat-7.4.0-linux-x86_64/logs/log2.log"
        },
        "offset" => 17233286
    },
       "message" => "08/10/2019 12:22:21 291   (null)                 DEBUG   68   Leftside Filter Expression : SubCategory=\"ANS Management\" AND SourceProblemName=\"8894 Burglary at ATM=\" for User ZK0DUBO",
         "agent" => {
        "ephemeral_id" => "623c4d83-8ea9-4456-b9b6-98bf9da622fb",
            "hostname" => "mehak-VirtualBox",
                "type" => "filebeat",
             "version" => "7.4.0",
                  "id" => "bad135c8-d359-4936-b515-79eb4bb24630"
    },

And filebeat terminal show sthis-

},
  "message": "08/10/2019 12:22:29 685   (null)                 DEBUG   10   IsActionsAllowedOnEntity - Action : CloseIncident",
  "fields": {
    "log_type": "trial_index"
  },
  "ecs": {
    "version": "1.1.0"
  },
  "host": {
    "name": "mehak-VirtualBox"
  }
}
2020-02-28T18:49:48.577-0800	DEBUG	[processors]	processing/processors.go:183Publish event: {
  "@timestamp": "2020-02-29T02:49:48.577Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "7.4.0"
  },
  "host": {
    "name": "mehak-VirtualBox"
  },
  "agent": {
    "type": "filebeat",
    "ephemeral_id": "623c4d83-8ea9-4456-b9b6-98bf9da622fb",
    "hostname": "mehak-VirtualBox",
    "id": "bad135c8-d359-4936-b515-79eb4bb24630",
    "version": "7.4.0"
  },
  "message": "08/10/2019 12:22:29 703   (null)                 DEBUG   61   Add activty in cache (152778101)",
  "log": {
    "file": {
      "path": "/home/mehak/Documents/filebeat-7.4.0-linux-x86_64/logs/log2.log"
    },
    "offset": 17530133
  },
  "fields": {
    "log_type": "trial_index"
  },
  "ecs": {
    "version": "1.1.0"
  }
}

Since I am not outputting on Elasticsearch, i dont see an index created on it.

Since I am not outputting on Elasticsearch, i dont see an index created on it.

Obviously.

Well, as you can see there, there's no [fields][tags] field in your output document, which is why you never enter those if blocks. So are you able to retrieve any log printed in the stdout with one of the writes you're setting the condition on (i.e. ngta-web or ngta-app)?

It'd be useful to see where it is putting those values that you expect to be in [fields][tags].

Also, can you share your filebeat.yml configuration?

Finally, since your [tags] field (whether it is nested in [fields] or a root field) is most likely an array of values, the way you are making that check is not correct, since you are checking if an array is equal to a string, whereas you should check if the tags array CONTAINS that string.
A proper check would be something like

if "ngta-web" in [fields][tags] {
...
}

obviously assuming the tags array you want to check on is nested in [fields].

So, share the filebeat.yml if you can, check where it is outputting your strings of interest and change that condition.

The indexes are made properly after fixing the fields tags syntax. Both the matches work on filename, and timestamp, code, anum, logmessage as fields.

But the if line I have below doesnt work. So how can I add the conditional? SHould this if statemnet be outside the if [fields][log_type]?

#if [log][file][path] == "log2.log"{
filter {
   if [fields][log_type] == "obapp-dotnet" {
    grok {
      #break_on_match => true
      match => {"[log][file][path]" => "%{GREEDYDATA}/%{GREEDYDATA:filename}\.log"}
    }
      #if [log][file][path] == "log2.log"{
      grok{
        match => { 
	"message" => [
	   "%{DATESTAMP:timestamp}%{SPACE}%{NONNEGINT:code}%{GREEDYDATA}%{LOGLEVEL:loglevel}%{SPACE}%{NONNEGINT:anum}%{SPACE}%{GREEDYDATA:logmessage}" 
 	]
       }
      }
    #}
  }

Is there any chance that [log][file][path] is never equal to log2.log ?

In your above sample, I see it equal to /home/mehak/Documents/filebeat-7.4.0-linux-x86_64/logs/log2.log . You do extract the filename: log2 with the previous grok, so why using the whole [log][file][path] again?

Why not using if [filename] == "log2" instead?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.