Filter out (drop) a log sent to Logstash, based on the values of its fields

Hello,

I use a pipeline to ingest logs generated by suricata, everything set up with filebeat setup --pipelines --modules suricata,<other_modules>. I don't want to ingest any "event" logs, only "alert" logs. So I added the following to the "filter" section of my conf.d/my-logstash.conf:

input {
  beats {
    port => 5044
    ssl => true
    ssl_certificate => "/etc/logstash/input.crt"
    ssl_key => "/etc/logstash/input.key"
  }
}

filter {
[...]
  # we block all the events (not alerts) from suricata
  if ([event][dataset] == "suricata.eve" and [event][kind] == "event") {
    drop { }
  }
[...]
}

output {
  if [@metadata][pipeline] {
    elasticsearch {
      hosts => "REDACTED"
      manage_template => false
      index => "%{[@metadata][pipeline]}-%{+YYYY.MM.dd}"
      pipeline => "%{[@metadata][pipeline]}"
      user => "REDACTED"
      password => "REDACTED"
    }
  } else {
    elasticsearch {
      hosts => "REDACTED"
      manage_template => false
      index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
      user => "REDACTED"
      password => "REDACTED"
    }
  }
}

A sample log that should not be ingested, but is:

{
  "_index": "filebeat-7.17.5-suricata-eve-pipeline-2022.07.26",
  "_type": "_doc",
  "_id": "REDACTED",
  "_version": 1,
  "_score": 1,
  "_ignored": [
    "event.original.keyword"
  ],
  "_source": {
    "agent": {
      "name": "REDACTED"
    },
    "log": {
      "file": {
        "path": "/var/log/suricata/eve.json"
      },
      "offset": 652069717
    },
    "destination": {
      "port": REDACTED,
      "bytes": 0,
      "ip": "REDACTED",
      "packets": 0
    },
    "source": {
      "geo": {
        "continent_name": "REDACTED",
        "country_iso_code": "REDACTED",
        "country_name": "REDACTED",
        "location": {
          "lon": REDACTED,
          "lat": REDACTED
        }
      },
      "as": {
        "number": REDACTED,
        "organization": {
          "name": "REDACTED"
        }
      },
      "port": REDACTED,
      "bytes": 318,
      "ip": "REDACTED",
      "packets": 2
    },
    "fileset": {},
    "network": {
      "community_id": "1:REDACTED",
      "bytes": 318,
      "transport": "tcp",
      "packets": 2,
      "direction": "inbound"
    },
    "tags": [
      "suricata",
      "beats_input_raw_event"
    ],
    "input": {},
    "@timestamp": "2022-07-26T14:51:32.179Z",
    "ecs": {},
    "related": {
      "ip": [
        "REDACTED",
        "REDACTED"
      ]
    },
    "service": {},
    "@version": "1",
    "host": {},
    "suricata": {
      "eve": {
        "tcp": {
          "tcp_flags_ts": "00",
          "tcp_flags_tc": "00",
          "tcp_flags": "00"
        },
        "community_id": "1:REDACTED",
        "event_type": "flow",
        "vlan": [
          6
        ],
        "flow_id": "REDACTED",
        "flow": {
          "reason": "timeout",
          "alerted": false,
          "state": "new",
          "age": 0
        }
      }
    },
    "event": {
      "duration": 17000000,
      "ingested": "2022-07-26T14:51:33.065975148Z",
      "original": "{\"timestamp\":\"2022-07-26T16:51:32.179196+0200\",\"flow_id\":REDACTED,\"in_iface\":\"REDACTED\",\"event_type\":\"flow\",\"vlan\":[6],\"src_ip\":\"REDACTED\",\"src_port\":REDACTED,\"dest_ip\":\"REDACTED\",\"dest_port\":REDACTED,\"proto\":\"TCP\",\"flow\":{\"pkts_toserver\":2,\"pkts_toclient\":0,\"bytes_toserver\":318,\"bytes_toclient\":0,\"start\":\"2022-07-26T16:48:45.891794+0200\",\"end\":\"2022-07-26T16:48:45.908620+0200\",\"age\":0,\"state\":\"new\",\"reason\":\"timeout\",\"alerted\":false},\"community_id\":\"1REDACTED\",\"tcp\":{\"tcp_flags\":\"00\",\"tcp_flags_ts\":\"00\",\"tcp_flags_tc\":\"00\"}}",
      "created": "2022-07-26T14:51:32.624Z",
      "kind": "event",
      "start": "2022-07-26T14:48:45.891Z",
      "end": "2022-07-26T14:48:45.908Z",
      "category": [
        "network"
      ],
      "type": [
        "connection",
        "start"
      ],
      "dataset": "suricata.eve"
    }
  },
  "fields": {
    "suricata.eve.community_id.keyword": [
      "1REDACTED"
    ],
    "event.category": [
      "network"
    ],
    "event.category.keyword": [
      "network"
    ],
    "event.dataset.keyword": [
      "suricata.eve"
    ],
    "suricata.eve.tcp.tcp_flags": [
      "00"
    ],
    "suricata.eve.tcp.tcp_flags_tc": [
      "00"
    ],
    "related.ip.keyword": [
      "REDACTED",
      "REDACTED"
    ],
    "event.kind.keyword": [
      "event"
    ],
    "source.ip": [
      "REDACTED"
    ],
    "agent.name": [
      "REDACTED"
    ],
    "suricata.eve.flow.reason": [
      "timeout"
    ],
    "suricata.eve.event_type": [
      "flow"
    ],
    "network.community_id": [
      "REDACTED"
    ],
    "suricata.eve.tcp.tcp_flags_tc.keyword": [
      "00"
    ],
    "event.kind": [
      "event"
    ],
    "suricata.eve.flow_id": [
      "REDACTED"
    ],
    "suricata.eve.tcp.tcp_flags.keyword": [
      "00"
    ],
    "event.original": [
      "{\"timestamp\":\"2022-07-26T16:51:32.179196+0200\",\"flow_id\":REDACTED,\"in_iface\":\"REDACTED\",\"event_type\":\"flow\",\"vlan\":[6],\"src_ip\":\"REDACTED\",\"src_port\":REDACTED,\"dest_ip\":\"REDACTED",\"dest_port\":REDACTED,\"proto\":\"TCP\",\"flow\":{\"pkts_toserver\":2,\"pkts_toclient\":0,\"bytes_toserver\":318,\"bytes_toclient\":0,\"start\":\"2022-07-26T16:48:45.891794+0200\",\"end\":\"2022-07-26T16:48:45.908620+0200\",\"age\":0,\"state\":\"new\",\"reason\":\"timeout\",\"alerted\":false},\"community_id\":\"1REDACTED",\"tcp\":{\"tcp_flags\":\"00\",\"tcp_flags_ts\":\"00\",\"tcp_flags_tc\":\"00\"}}"
    ],
    "source.geo.location.lon": [
      REDACTED
    ],
    "source.packets": [
      2
    ],
    "suricata.eve.tcp.tcp_flags_ts": [
      "00"
    ],
    "suricata.eve.community_id": [
      "1:REDACTED="
    ],
    "network.packets": [
      2
    ],
    "source.as.organization.name.keyword": [
      "REDACTED"
    ],
    "@version.keyword": [
      "1"
    ],
    "log.offset": [
      652069717
    ],
    "suricata.eve.flow.reason.keyword": [
      "timeout"
    ],
    "tags": [
      "suricata",
      "beats_input_raw_event"
    ],
    "destination.ip.keyword": [
      "REDACTED"
    ],
    "source.port": [
      REDACTED
    ],
    "event.created": [
      "2022-07-26T14:51:32.624Z"
    ],
    "source.ip.keyword": [
      "REDACTED"
    ],
    "destination.bytes": [
      0
    ],
    "event.start": [
      "2022-07-26T14:48:45.891Z"
    ],
    "source.as.number": [
      REDACTED
    ],
    "suricata.eve.flow.state.keyword": [
      "new"
    ],
    "network.community_id.keyword": [
      "1:REDACTED"
    ],
    "destination.port": [
      REDACTED
    ],
    "tags.keyword": [
      "suricata",
      "beats_input_raw_event"
    ],
    "event.end": [
      "2022-07-26T14:48:45.908Z"
    ],
    "suricata.eve.event_type.keyword": [
      "flow"
    ],
    "destination.packets": [
      0
    ],
    "event.type.keyword": [
      "connection",
      "start"
    ],
    "suricata.eve.flow.state": [
      "new"
    ],
    "related.ip": [
      "REDACTED",
      "REDACTED"
    ],
    "source.geo.country_iso_code": [
      "REDACTED"
    ],
    "@version": [
      "1"
    ],
    "source.geo.country_iso_code.keyword": [
      "REDACTED"
    ],
    "network.bytes": [
      318
    ],
    "network.direction": [
      "inbound"
    ],
    "log.file.path.keyword": [
      "/var/log/suricata/eve.json"
    ],
    "source.bytes": [
      318
    ],
    "network.direction.keyword": [
      "inbound"
    ],
    "suricata.eve.flow_id.keyword": [
      "REDACTED"
    ],
    "suricata.eve.flow.alerted": [
      false
    ],
    "agent.name.keyword": [
      "REDACTED"
    ],
    "suricata.eve.flow.age": [
      0
    ],
    "source.as.organization.name": [
      "REDACTED"
    ],
    "source.geo.continent_name": [
      "REDACTED"
    ],
    "network.transport.keyword": [
      "tcp"
    ],
    "destination.ip": [
      "REDACTED"
    ],
    "network.transport": [
      "tcp"
    ],
    "event.duration": [
      17000000
    ],
    "suricata.eve.tcp.tcp_flags_ts.keyword": [
      "00"
    ],
    "event.ingested": [
      "2022-07-26T14:51:33.065Z"
    ],
    "@timestamp": [
      "2022-07-26T14:51:32.179Z"
    ],
    "suricata.eve.vlan": [
      6
    ],
    "source.geo.location.lat": [
      REDACTED
    ],
    "event.type": [
      "connection",
      "start"
    ],
    "log.file.path": [
      "/var/log/suricata/eve.json"
    ],
    "source.geo.country_name.keyword": [
      "REDACTED"
    ],
    "source.geo.continent_name.keyword": [
      "REDACTED"
    ],
    "source.geo.country_name": [
      "REDACTED"
    ],
    "event.dataset": [
      "suricata.eve"
    ]
  },
  "ignored_field_values": {
    "event.original.keyword": [
      "{\"timestamp\":\"2022-07-26T16:51:32.179196+0200\",\"flow_id\":REDACTED,\"in_iface\":\"REDACTED\",\"event_type\":\"flow\",\"vlan\":[6],\"src_ip\":\"REDACTED",\"src_port\":REDACTED,\"dest_ip\":\"REDACTED\",\"dest_port\":REDACTED,\"proto\":\"TCP\",\"flow\":{\"pkts_toserver\":2,\"pkts_toclient\":0,\"bytes_toserver\":318,\"bytes_toclient\":0,\"start\":\"2022-07-26T16:48:45.891794+0200\",\"end\":\"2022-07-26T16:48:45.908620+0200\",\"age\":0,\"state\":\"new\",\"reason\":\"timeout\",\"alerted\":false},\"community_id\":\"1:REDACTED\",\"tcp\":{\"tcp_flags\":\"00\",\"tcp_flags_ts\":\"00\",\"tcp_flags_tc\":\"00\"}}"
    ]
  }
}

Other filters I've created also do not work, all are similar: an if condition on some fields, then if the conditions are satisfied, drop { } is called. How can I achieve what I'm trying to do, which is ignoring some stuff that is sent to Logstash but that I do not want in my ES? Maybe I have some fundamental misunderstanding about how things work. I also suspect those "keyword" fields: why are they created in the first place? Why can't I just have the normal fields and work with that? I've tried adding "[keyword]" at the end of my fields in the filter, same result.

Anyhow, running bin/logstash --config.test_and_exit -f /etc/logstash/conf.d/my-logstash.conf only returns ECS warnings (because I didn't set it), then "Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash".

I use the up-to-date Debian packages on ES's repo, so v7.17.5 for all ES tools. Any help would be greatly appreciated! Thanks a lot.

Hi there, just to confirm - is Filebeat writing to Logstash or directly to Elasticsearch?
Some of the fields you see in Kibana are added by an ingest pipeline in Elasticsearch, so some of the fields are actually created after your Logstash pipeline is executed.

See the ingest pipeline for Suricata EVE logs

You could inspect the actual events that arrive at Logstash to see what fields you can work with, you certainly wont have [event][dataset] since this is added afterwards.

For inspection, instead of writing to Elasticsearch, first you could either:

  • Write to a file using the file output in Logstash.
  • Use a tool like this one to inspect the documents in real time.

Then you can see what fields you can use in your if, but im almost sure you will only have [event_type] and [suricata][eve][flow][alerted] to work with.

Good luck

1 Like

Thank you for your response!

So I added a "file" output like you suggested. But this is not helping, here is a sample log written to my file:

{
  "ecs": {},
  "tags": [
    "suricata",
    "beats_input_raw_event"
  ],
  "fileset": {},
  "destination": {
    "port": 80,
    "ip": "REDACTED"
  },
  "host": {},
  "@version": "1",
  "source": {
    "port": 52265,
    "ip": "REDACTED"
  },
  "log": {
    "file": {
      "path": "/var/log/suricata/eve.json"
    },
    "offset": 128305309
  },
  "input": {},
  "service": {},
  "agent": {
    "name": "REDACTED"
  },
  "suricata": {
    "eve": {
      "event_type": "flow",
      "tcp": {
        "state": "syn_sent",
        "tcp_flags_tc": "00",
        "syn": true,
        "psh": true,
        "fin": true,
        "tcp_flags_ts": "1b",
        "tcp_flags": "1b",
        "ack": true
      },
      "flow": {
        "bytes_toserver": 656,
        "bytes_toclient": 0,
        "state": "new",
        "age": 109,
        "alerted": false,
        "pkts_toserver": 7,
        "reason": "unknown",
        "end": "2022-07-27T09:15:52.862858+0200",
        "start": "2022-07-27T09:14:03.166736+0200",
        "pkts_toclient": 0
      },
      "flow_id": "REDACTED",
      "vlan": [
        13
      ],
      "community_id": "1:REDACTED"
    }
  },
  "event": {
    "dataset": "suricata.eve",
    "created": "2022-07-27T07:17:27.915Z",
    "original": "{\"timestamp\":\"2022-07-27T09:17:27.415266+0200\",\"flow_id\":REDACTED,\"in_iface\":\"REDACTED\",\"event_type\":\"flow\",\"vlan\":[13],\"src_ip\":\"REDACTED\",\"src_port\":52265,\"dest_ip\":\"REDACTED\",\"dest_port\":80,\"proto\":\"TCP\",\"flow\":{\"pkts_toserver\":7,\"pkts_toclient\":0,\"bytes_toserver\":656,\"bytes_toclient\":0,\"start\":\"2022-07-27T09:14:03.166736+0200\",\"end\":\"2022-07-27T09:15:52.862858+0200\",\"age\":109,\"state\":\"new\",\"reason\":\"unknown\",\"alerted\":false},\"community_id\":\"REDACTED\",\"tcp\":{\"tcp_flags\":\"1b\",\"tcp_flags_ts\":\"1b\",\"tcp_flags_tc\":\"00\",\"syn\":true,\"fin\":true,\"psh\":true,\"ack\":true,\"state\":\"syn_sent\"}}"
  },
  "@timestamp": "2022-07-27T07:17:27.415Z",
  "network": {
    "transport": "TCP",
    "direction": "outbound",
    "community_id": "1:REDACTED"
  }
}

So it seems [event][dataset] should be working, no? Though [event][kind] does not exist. I changed my condition to if [suricata][eve][flow][alerted] == "false". But no luck, I still have those logs that I do not want.

Another example: I also use the zeek module, and want to ignore logs concerning a file that has the string "FOOBAR" in its filename. This is a sample log that should not be indexed (again obtained with the file output):

{
  "tags": [
    "zeek.smb_files",
    "beats_input_raw_event"
  ],
  "destination": {
    "port": 445,
    "ip": "REDACTED"
  },
  "fileset": {},
  "host": {},
  "network": {
    "protocol": "smb",
    "transport": "tcp",
    "direction": "internal",
    "community_id": "REDACTED"
  },
  "ecs": {},
  "@version": "1",
  "source": {
    "port": 49493,
    "ip": "REDACTED"
  },
  "log": {
    "file": {
      "path": "/opt/zeek/logs/current/smb_files.log"
    },
    "offset": 14417240
  },
  "input": {},
  "service": {},
  "file": {
    "name": "fooFOOBARbar.myfiletobeignored",
    "size": 268435456
  },
  "agent": {
    "name": "REDACTED"
  },
  "event": {
    "dataset": "zeek.smb_files",
    "action": "SMB::FILE_OPEN",
    "category": [
      "network",
      "file"
    ],
    "kind": "event",
    "id": "REDACTED",
    "type": [
      "connection",
      "protocol"
    ]
  },
  "@timestamp": "2022-07-27T07:23:04.987Z",
  "zeek": {
    "session_id": "REDACTED",
    "smb_files": {
      "times.modified": 1658906584.3919132,
      "action": "SMB::FILE_OPEN",
      "size": 268435456,
      "times.created": 1657275110.0366983,
      "path": "REDACTED",
      "name": "fooFOOBARbar.myfiletobeignored",
      "times.accessed": 1657275110.0366983,
      "ts": 1658906584.399471,
      "times.changed": 1658906584.3919132
    }
  }
}

And this is the condition: if [file][name] =~ /.*FOOBAR.*/. I've also tried [zeek][smb_files][name]. Again, no luck, it's still indexed. And for this log, [event][kind] exists.

I have no idea if the filter section of my logstash conf is even considered... :frowning:

If you have more ideas I'd be glad to hear them. Thanks a lot!

Edit: to answer your question, Filebeat does write to Logstash and not directly to ES. ES isn't even accessible to the Filebeat hosts, only the Logstash host can contact it

You are actually comparing with a string, try it like that:

if [suricata][eve][flow][alerted] { ... }

Thank you! After some trial and error I got it to work with the following condition:

if (! [suricata][eve][flow][alerted] or [suricata][eve][event_type] == "alert") and "suricata" in [tags] {

This way events that produce an alert, and the alert itself are indexed. Everything else is dropped.

I still have the problem described with the zeek module, this one, I still couldn't figure out.

You can still use if "FOOBAR" in [file][name] { instead of a regex, but if [file][name] =~ /.*FOOBAR.*/ { also works.

I highly recommend you to use something to test your pipeline and more importantly ITERATE over its development, be with file{} output or Logshark.