Filebeat filtering, drop event processor script

Hello team, Im new on filebeat and i want to ask about processor script on filebeat.
I have a log file that contains some event.code. i want to exclude 3 event code based on this condition below from my log

event.code : (1234 or 4567 or 7890 AND (event.duration < 3600000000000 OR event.bytes < 100000000)

Heres my processor script code on filebeat.yml

processors: 
  - drop_event: 
      when: 
        - or: 
            - equals:
                event.code: "1234"
            - and: 
                - or: 
                    - range:
                        event.duration.lt: 3600000000000
                    - range:
                        event.bytes.lt: 100000000
            - equals:
                event.code: "4567"
            - and: 
                - or: 
                    - range:
                        event.duration.lt: 3600000000000
                    - range:
                        event.bytes.lt: 100000000
            - equals:
                event.code: "7890"
            - and: 
                - or: 
                    - range:
                        event.duration.lt: 3600000000000
                    - range:
                        event.bytes.lt: 100000000

When i try to run this code but get this error from filebeat.

2022-01-27T10:45:08.318+0700    INFO    instance/beat.go:686    Home path: [/home/app/firewall_ftd-fb-new/filebeat-7.16.2-linux-x86_64] Config path: [/home/app/firewall_ftd-fb-new/filebeat-7.16.2-linux-x86_64] Data path: [/data/filebeat-queue_ftd] Logs path: [/home/app/firewall_ftd-fb-new/filebeat-7.16.2-linux-x86_64/logs] Hostfs Path: [/]
2022-01-27T10:45:08.318+0700    INFO    instance/beat.go:694    Beat ID: 73a638c9-74b4-4fac-a29f-524eab3d5df2
2022-01-27T10:45:08.800+0700    WARN    [add_cloud_metadata]    add_cloud_metadata/provider_aws_ec2.go:95       error when check request status for getting IMDSv2 token: http request status 404. No token in the metadata request will be used.
2022-01-27T10:45:08.802+0700    ERROR   instance/beat.go:1015   Exiting: error initializing processors: failed to initialize condition: missing or invalid condition
Exiting: error initializing processors: failed to initialize condition: missing or invalid condition

can you give me suggestion to solve this error ?
Thankyou team

What I learned from yaml in the last 2 weeks or so:
4 space indentation from a dash, 2 space indentation from not-a-dash.

Beside that, I think you shouldn't need a dash for the conditions themselves ("equal"/"range").
The thing I can't help you with is the nesting order... I have not a clue with "or" and "and"...
Try the above for now though.

In terms of logical order I would do:

(THIS IS PSEUDO CODE!) 
and:
  or:
    condition:
    condition:
    condition:
  or:
    condition:
    condition:

This would be (code=1 or code=2 or code=3) AND (duration = 1 or bytes = 2)

Hello @ziv1 , thankyour for your hep. let me try and i will give the update asap

Hello! According to the docs seems that when is not defined as a list, other than that the rest looks syntactically correct:

processors:
  - drop_event:
      when:
        or:
          - equals:
              event.code: "1234"
          - and:
            - or:
              - range:
                  event.duration.lt: 3600000000000
              - range:
                  event.bytes.lt: 100000000
          - equals:
              event.code: "4567"
          - and:
            - or:
              - range:
                  event.duration.lt: 3600000000000
              - range:
                  event.bytes.lt: 100000000
          - equals:
              event.code: "7890"
          - and:
            - or:
              - range:
                  event.duration.lt: 3600000000000
              - range:
                  event.bytes.lt: 100000000

I managed to start filebeat correctly by just dropping the - in the first or under when. Hope this helps!

Edit to add:

Syntax aside, I think your intention is to have the range condition tied to each of the event code checks, I think your conditions should look like:

processors:
  - drop_event:
      when:
        and:
          - or:
            - equals:
                event.code: "1234"
            - equals:
                event.code: "4567"
            - equals:
                event.code: "7890"
          - or:
            - range:
                event.duration.lt: 3600000000000
            - range:
                event.bytes.lt: 100000000

If this is not the case please disregard this :smiley:

Hello @marc.guasch , Thankyou for your help and suggestion, let me try your yaml and update after that.

Hello @marc.guasch, i want to ask again about this case
i already use your syntax to my filebeat but the filter not work and log still inserted to my Elasticsearch.

are this happened because i dont parse my log ? or what ?

any suggestion ?

Thankyou

Hello!

It is hard to tell without some more information. Could you share your full filebeat config and one of the events that got inserted to Elasticsearch and that should have been dropped?

Please remind to redact any private data from the config or the events.

Hello @marc.guasch, Heres my filebeat.conf

# ============================== Filebeat inputs ===============================

filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

- type: log

  # Change to true to enable this input configuration.
  enabled: false

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /var/log/*.log

  # Exclude lines. A list of regular expressions to match. It drops the lines that are
  # matching any regular expression from the list.
  #exclude_lines: ['^DBG']

  # Include lines. A list of regular expressions to match. It exports the lines that are
  # matching any regular expression from the list.
  #include_lines: ['^ERR', '^WARN']

  # Exclude files. A list of regular expressions to match. Filebeat drops the files that
  # are matching any regular expression from the list. By default, no files are dropped.
  #exclude_files: ['.gz$']

  # Optional additional fields. These fields can be freely picked
  # to add additional information to the crawled log files for filtering
  #fields:
  #  level: debug
  #  review: 1

  ### Multiline options

  # Multiline can be used for log messages spanning multiple lines. This is common
  # for Java Stack Traces or C-Line Continuation

  # The regexp Pattern that has to be matched. The example pattern matches all lines starting with [
  #multiline.pattern: ^\[

  # Defines if the pattern set under pattern should be negated or not. Default is false.
  #multiline.negate: false

  # Match can be set to "after" or "before". It is used to define if lines should be append to a pattern
  # that was (not) matched before or after or as long as a pattern is not matched based on negate.
  # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash
  #multiline.match: after

# filestream is an experimental input. It is going to replace log input in the future.
- type: filestream

  # Change to true to enable this input configuration.
  enabled: false

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /var/log/*.log

  # Exclude lines. A list of regular expressions to match. It drops the lines that are
  # matching any regular expression from the list.
  #exclude_lines: ['^DBG']

  # Include lines. A list of regular expressions to match. It exports the lines that are
  # matching any regular expression from the list.
  #include_lines: ['^ERR', '^WARN']

  # Exclude files. A list of regular expressions to match. Filebeat drops the files that
  # are matching any regular expression from the list. By default, no files are dropped.
  #prospector.scanner.exclude_files: ['.gz$']

  # Optional additional fields. These fields can be freely picked
  # to add additional information to the crawled log files for filtering
  #fields:
  #  level: debug
  #  review: 1

#- type: udp
#  enabled: true
#
# ============================== Filebeat modules ==============================

filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml

  # Set to true to enable config reloading
  reload.enabled: false

  # Period on which files under path should be checked for changes
  #reload.period: 10s

# ======================= Elasticsearch template setting =======================

#setup.template.settings:
  index.number_of_shards: 1

# ================================== General ===================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# output.
#fields:


# ================================= Dashboards =================================
# These settings control loading the sample dashboards to the Kibana index. Loading
# the dashboards is disabled by default and can be enabled either by setting the
# options here or by using the `setup` command.
#setup.dashboards.enabled: false

# The URL from where to download the dashboards archive. By default this URL
# has a value which is computed based on the Beat name and version. For released
# versions, this URL points to the dashboard archive on the artifacts.elastic.co
# website.
#setup.dashboards.url:

# =================================== Kibana ===================================

# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.
# This requires a Kibana endpoint configuration.
#setup.kibana:

# =============================== Elastic Cloud ================================

# These settings simplify using Filebeat with the Elastic Cloud (https://cloud.elastic.co/).

# The cloud.id setting overwrites the `output.elasticsearch.hosts` and
# `setup.kibana.host` options.
# You can find the `cloud.id` in the Elastic Cloud web UI.
#cloud.id:

# The cloud.auth setting overwrites the `output.elasticsearch.username` and
# `output.elasticsearch.password` settings. The format is `<user>:<pass>`.
#cloud.auth:

# ================================== Outputs ===================================

# Configure what output to use when sending the data collected by the beat.

# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
  # Array of hosts to connect to.
  hosts: ["host ip"]

  # Protocol - either `http` (default) or `https`.
  protocol: "https"

  # Authentication credentials - either API key or username/password.
  #api_key: "id:api_key"
  username: "xxx"
  password: "xxx"
  ssl.verification_mode: none
  ssl.certificate_authorities: ["xxx"]
  ssl.certificate: "xxx"
  ssl.key: "xxx"
  worker: 25
  bulk_max_size: 1500


output.elasticsearch.index: "elasticsearch-index-test"
setup.template.enabled: false
setup.ilm.enabled: auto
setup.ilm.rollover_alias: "elasticsearch-index-test"
setup.ilm.policy_name: "elastic_index"

# ------------------------------ Logstash Output -------------------------------
#output.logstash:
  # The Logstash hosts
  #hosts: ["localhost:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications

# ================================= Processors =================================
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~
  - - drop_event:
      when:
        and:
          - or:
            - equals:
                event.code: "1234"
            - equals:
                event.code: "4567"
            - equals:
                event.code: "7890"
          - or:
            - range:
                event.duration.lt: 3600000000000
            - range:
                event.bytes.lt: 100000000

# ================================== Logging ===================================

# Sets log level. The default log level is info.
# Available log levels are: error, warning, info, debug
#logging.level: debug

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publisher", "service".
#logging.selectors: ["*"]

# ============================= X-Pack Monitoring ==============================
# Filebeat can export internal metrics to a central Elasticsearch monitoring
# cluster.  This requires xpack monitoring to be enabled in Elasticsearch.  The
# reporting is disabled by default.

# Set to true to enable the monitoring reporter.
#monitoring.enabled: false

# Sets the UUID of the Elasticsearch cluster under which monitoring data for this
# Filebeat instance will appear in the Stack Monitoring UI. If output.elasticsearch
# is enabled, the UUID is derived from the Elasticsearch cluster referenced by output.elasticsearch.
#monitoring.cluster_uuid:

# Uncomment to send the metrics to Elasticsearch. Most settings from the
# Elasticsearch output are accepted here as well.
# Note that the settings should point to your Elasticsearch *monitoring* cluster.
# Any setting that is not set is automatically inherited from the Elasticsearch
# output configuration, so if you have the Elasticsearch output configured such
# that it is pointing to your Elasticsearch monitoring cluster, you can simply
# uncomment the following line.
#monitoring.elasticsearch:

# ============================== Instrumentation ===============================

# Instrumentation support for the filebeat.
#instrumentation:
    # Set to true to enable instrumentation of filebeat.
    #enabled: false

    # Environment in which filebeat is running on (eg: staging, production, etc.)
    #environment: ""

    # APM Server hosts to report instrumentation results to.
    #hosts:
    #  - http://localhost:8200

    # API Key for the APM Server(s).
    # If api_key is set then secret_token will be ignored.
    #api_key:

    # Secret token for the APM Server(s).
    #secret_token:


# ================================= Migration ==================================

# This allows to enable 6.7 migration aliases
#migration.6_to_7.enabled: true

#http.enabled: true
#http.host: localhost
#http.port: 5066

#queue:
#  mem:
#    events: 468640
#
#disk:
#  max_size: 10GB

and this the log that inserted to my Elasticsearch index

@timestamp:Feb 2, 2022 @ 10:45:06.771 agent.ephemeral_id:ca75aa09-52ed-408b-8858-39da4c3fde36 agent.id:46a09d59-f15e-4a79-8c89-54f466d1a64z agent.type:filebeat agent.version:7.16.2 cisco.ftd.connection_id:xxx cisco.ftd.destination_interface:Cloud.2 cisco.ftd.message_id:1234 cisco.ftd.source_interface:Cloud.1  cisco.ftd.suffix:session cloud.availability_zone:AZ-IT cloud.instance.id:346557df cloud.provider:huawei cloud.region:(empty) cloud.service.name:ECS destination.address:xxx destination.ip:xxx destination.port:xxx ecs.version:1.12.0 event.action:flow-expiration event.category:network event.code:1234 event.dataset:cisco.ftd event.duration:130,000,000,000 event.end:Feb 2, 2022 @ 10:45:06.771 event.ingested:Feb 2, 2022 @ 10:45:07.783 event.kind:event event.module:cisco event.original::Feb 02

I think this should be - drop_event: instead, there is an extra -.

Hello @marc.guasch

sorry for the mistake typing double "-" on drop_event.

I try compile the yaml again , without the double "-" , but the result still same. log still inserted to my Elasticsearch index.

any suggestion again ?

Whats the mapping on event.code, are you sure its indexed as text maybe you are trying to do an equation using a string and a number.

Can you share the json document as it gets inserted? Then we can figure out if the conditions are not the correct ones. Thanks!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.