Logstash filter string anywhere

Hi there,

I want to filter firewall logs if some specific string matches anywhere in the message and do not know exactly how to do it.

Sample Log message:
{"zone_src":"SOURCE","zone_dst":"EXTERNAL","reason":"rule","rule_id":123456,"rule_description":"Description","action":"ACCEPT","@timestamp":"2022-09-XXT0X:52:12.123123+0200","timestamp":12345678,"timestamp_usec":123123,"iface_in":"eth11.1111","iface_out":"eth2.111","ip_src":"1.1.1.1","ip_dst":"2.2.2.2","protocol":6,"port_src":54321,"port_dst":2345,"mark":17825392,"tos":1,"host_id":164119,"host_name":"hostname","logtype":"forward"}

I want to get all Logs containing "zone_dst":"EXTERNAL"

I tried it with:

filter {
        grok {
           match => { "message" => "("zone_dst":"External")"
                 }
        }

but that doesn't work.

Does anyone of you can assist me?

Thank you very much!

Max

If your logs are always in JSON format you may have better luck using the JSON filter plugin: JSON filter plugin | Logstash Reference [8.4] | Elastic

Example:

filter {
  json {
    source => "message"
  }
}

You can then use a conditional to do something if the value is External EG:

if [zone_dst] == "External" {
  #do something
}

Alternatively, if you only want to ship events with the value of 'External' you could invert the above conditional and use the Drop filter plugin (Drop filter plugin | Logstash Reference [8.4] | Elastic):

if [zone_dst] != "External" {
  drop { }
}

Hi Steve,

I tried it like this, but I got an configuration error:

input {
        tcp {
                port => 10516
                type => syslog
        }
}
filter {
        json {
           source =>  "message"
                if [zone_dst] != "External" {
                drop {}
                }
        }
kv {
           source => "message"
           field_split => ","
           value_split => ":"
           trim_key => "{\\\""
           trim_value => "\\\"}"
           #target_field => "details"
           include_brackets => false
    }
}

The error is:

[2022-09-16T11:30:04,199][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \t\r\n], "#", "=>" at line 10, column 6 (byte 101) after filter {\n\tjson {\n\t source => "message"\n\t\tif ",

You cannot have an IF conditiona inside the json filter.

You also do not need the kv filter, the json filter will parse your message.

You just need this:

input {
        tcp {
                port => 10516
                type => syslog
        }
}
filter {
        json {
           source =>  "message"
        }
        if [zone_dst] != "EXTERNAL" {
            drop {}
        }
}

In your log you have EXTERNAL, not External, so you need to match the exact value.

Hi Leandro,

thank you for your hints.
Now logstash starts correctly, but it seems that it doesn't recognize the message as json:

[2022-09-16T22:17:02,018][WARN ][logstash.filters.json    ][main][9bee16fd531aafb8069711ca7a54cf6070ab1d5419a8d3b5ca4b7cd7cb23f174] Error parsing json {:source=>"message", :raw=>"528 <13>1 2022-09-16T22:17:01+02:00 1.1.1.1 REPLACED - - - {\"zone_src\":\"REPLACED\",\"zone_dst\":\"EXTERNAL\",\"reason\":\"rule\",\"rule_id\":12345,\"rule_description\":\"REPLACED\",\"action\":\"ACCEPT\",\"@timestamp\":\"2022-09-16T22:17:00.912286+0200\",\"timestamp\":1663359420,\"timestamp_usec\":912286,\"iface_in\":\"eth12.345\",\"iface_out\":\"eth67.890\",\"ip_src\":\"2.2.2.2\",\"ip_dst\":\"3.3.3.3\",\"protocol\":17,\"port_src\":12345,\"port_dst\":12345,\"mark\":17825792,\"tos\":0,\"host_id\":164119,\"host_name\":\"REPLACED\",\"logtype\":\"forward\"}", :exception=>#<LogStash::Json::ParserError: Unexpected character ('<' (code 60)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')```

It seems that syslog-ng adds the "<13>" before sending it to logstash.

Here is my syslog-ng conf regarding logstash:

source s_network_test {
  network(
    ip("0.0.0.0")
    log_iw_size(10000)
    log_fetch_limit(1000)
    transport("udp")
    port(514)
    flags(dont-store-legacy-msghdr,no-parse));
};

rewrite r_test { set("test", value("PROGRAM")); };

destination d_logstash_test {
  syslog("X.X.X.X" transport("tcp") port(12345)
        template("$(format-json --scope dot-nv-pairs)")
  );
 };

log {
source(s_network_test);
rewrite(r_test);
destination(d_logstash_test);
flags(flow-control);
};

Thank you so much!

BR,
Max

Can you post a full and complete log line redacting any sensitive information? You may want to look at Syslog input plugin | Logstash Reference [8.4] | Elastic if you are sending syslogs, you can then parse the syslog and then run the JSON filter against the syslog message rather than the full line