If condition in logstash output doesn't work

The config file is shown as below

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    group_id => "log_monitor"
    auto_offset_reset => "latest"
    consumer_threads => 1
    topics => ["test_log"]
  }
}

filter{
}
 
output {
  **if [severity] =~ "err"{**
     kafka{
      topic_id => "filtered_log"
  }

}
}

input data example is shown as below, which is in json format.

{"appname":"jesseddy","facility":"kern","hostname":"for.net","message":"Pretty pretty pretty good","msgid":"ID902","procid":6810,"severity":"err","timestamp":"2022-08-03T08:07:41.909Z","version":2}

All I want to achieve is to only push the data with severity is "err" into a new topic of kafka. But the above configuration file doesn' t work. I can not not consume any data from kafka.

If I remove the if condition in output, I can successfully consume all the data. Therefore maybe I wrote the wrong if condition. Anyone can help plz?

Comment if need more information, thanks.

If "err" is only a value, not error, ERR, then you can use:

if [severity] == "err" {...}

Hi, Rios

Yes, "err" is only a value in json. I changed the configuration as you provided, but still can not consume any data in kafka. It seems somehow dropped all the data .

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    group_id => "log_monitor"
    auto_offset_reset => "latest"
    consumer_threads => 1
    topics => ["test_log"]
  }
}

filter{
}
 
output {
  if [severity] == "err"{
   kafka{
    topic_id => "filtered_log"
  }

}
}

New update.

I achieved what I need by using this configuration.

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    group_id => "log_monitor"
    auto_offset_reset => "latest"
    consumer_threads => 1
    topics => ["test_log"]
  }
}

filter{

}
 
output {
  **if [message] =~ /err/{**
   kafka{
    topic_id => "filtered_log"
  }

}
}

Although I achieved what I need, but I am still confused. If we look at my input data example, we can see that "err" is a value in "severity" not in "message". So why I can filter out the data by using 'if [message] =~ "err" ' ?
:face_with_spiral_eyes:

The [message] field contains the serialized JSON. Unless your parse that using a json filter you will not have a [severity] field.

1 Like

Thank you.

Is there any other default field other than [message]? Where should I look for this kind of information to study?(Totally new to logstash). Thanks in advance!

Take a look into Accessing event data and fields | Logstash Reference [8.3] | Elastic and see some explanations about fields in Logstash. Metadata could also be useful for you.

1 Like

If you search the message filed for "err" you might get some like {..."message":"An error occured",..}

I would use:

  • JSON plugin to convert the structure to the fields, as Bager recommend
  • Use grok to get a value from the severity field

Add to filter:

filter {
 grok {
	 match => { "message" => "\"severity\":\"%{LOGLEVEL:[@metadata][severity]}\"" }
	 }
}
output {
  if [@metadata][severity] == "err" { ... }
}

Also you can use regex on the field:
if [@metadata][severity] =~ /err/

1 Like