Filebeat Filtering - Drop Event when NOT contain field that equals a value

Hi all,

I need your help in order to filter some logs. What I need to do is to drop the events of all my logs that don't have an alert object in them with a severity of 3. I want to save in Elasticsearch only those that have a severity of 3. The rest of the logs that don't have a alert object, or a severity of 3 I want to have them dropped and not saved within ES.

PS Some of the json logs have no alert object. Just fyi

The log looks like this:

{
"timestamp": "ZZZZ",
...
"alert": {
              ...
              "severity": "3"
              ...
              }
...
}

What I tried until now is

    processors:
      - drop_event:
          when:
            not.contains:
              alert: "*"

Or

processors:
  - drop_event:
      when:
        not.regexp:
          severity: "[2-3]"

The field after pipelines become event.severity=3
10x!

Are u already parsing the string as json? If so try something like below

    processors:
      - drop_event:
          when:
            or:
              equal:
                alert: null
              not.regexg:
                alert.severity: "[2-3]"
1 Like

Thank you Alex!
I will give it a try tomorrow.

Yes the log is already exported as a JSON so I need to save only those that follow the rule!

Cheers!

Thank you! Is not working. I receive in logs Pipeline client receives callback on ‘onFilteredOut’ for event and no index is created hence no data is saved in ES. It is like is not able to find the xpath of the object.

Are there any other processors being run? Can you post ur entire Filebeat config? Can you post a sample log that you're trying to parse?

Hi Alex,

Please find bellow an example of the original log that filebeat is taking from my tool.

I don't know when the preprocessing is done. If is done before pipeline, from the original log i need only those that have alert.severity = 3

{
    "timestamp": "2021-05-26T12:00:59.402359+0300",
    "flow_id": 529848509194817,
    "in_iface": "code",
    "event_type": "alert",
    "proto": "TCP",
    "metadata": {
        "flowbits": [
            "tcp.retransmission.alerted"
        ],
        "flowints": {
            "tcp.retransmission.count": 28
        }
    },
    "alert": {
        "action": "allowed",
        "gid": 1,
        "signature_id": 2210056,
        "rev": 1,
        "severity": 3
    },
    "app_proto": "tls",
    "flow": {
        "pkts_toserver": 90197,
        "pkts_toclient": 99534,
        "bytes_toserver": 5593508,
        "bytes_toclient": 30748923,
        "start": "2021-05-26T10:26:36.647745+0300"
    }
}

If the preprocessing is done after this event been take, then i need the snort.code.alert.severity=3

This is how the event looks like in the debugger log event.

pipeline/client.go: 231  Pipeline client receives callback 'onFilteredOut' for event: {Timestamp: 2021-05-28 07: 01: 34.364773 +0000 UTC Meta: {
        "pipeline": "filebeat-7.13.0-pipeline"
    } Fields: {
        "agent": {
            "type": "filebeat",
            "version": "7.13.0"
        },
        "ecs": {
            "version": "1.9.0"
        },
        "fileset": {
            "name": "code"
        },
        "input": {
            "type": "log"
        },
        "network": {
            "community_id": "1:code=",
            "transport": "TCP"
        },
        "snort": {
            "code": {
                "alert": {
                    "gid": 1,
                    "rev": 2,
                    "severity": 3,
                },
                "app_proto": "rdp",
                "event_type": "alert",
                "flow": {
                    "bytes_toclient": 14274883,
                    "bytes_toserver": 473802,
                    "pkts_toclient": 16485,
                    "pkts_toserver": 7436,
                    "start": "2021-05-28T08:52:06.872123+0300"
                },
                "metadata": {
                    "flowbits": [
                        "ms.rdp.established",
                        "tcp.retransmission.alerted"
                    ],
                    "flowints": {
                        "tcp.retransmission.count": 122
                    }
                }
            }
        },
    }  TimeSeries: false
}

This is my filebeat.yml

filebeat.config:
  modules:
    path: ${path.config}/modules.d/*.yml
    reload.enabled: true

filebeat.modules:
- module: snort
  code:
    enabled: true
    var.paths: ["log*.json"]

   
processors:
- add_cloud_metadata: ~
- drop_event:
  when:
     or: 
     - not.regexp.severity: "[1-3]"
     - not.regexp.snort.code.alert.severity: "[1-3]"


setup:
  template.settings.index:
    number_of_shards: 1
    number_of_replicas: 1
  kibana.host: "kibana:5601"


output.elasticsearch:
  hosts: ["http://es01:9200"]

Any help is really appreciated!

Thank you!

Did u create a custom fileset for the Snort Module?

Yes, these processors will execute after the module runs it processors. So if you only want to keep severity=3, below should work.

- drop_event:
  when:
     not:
       equal:
         snort.code.alert.severity: 3

Thank you!
But why is not working with a regexp 2 to 3?
is working with equal

- drop_event:
  when:
     not:
       regexp:
         snort.code.alert.severity: "[2-3]"

I don't know. I'd have to look into the regex processor.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.