Groks solution in filebeat

Hello community,

Having encountered the problem of how to apply groks in filebeat, I want to share with you the solution I found with the PROCESSORS section and the Dissect function, I hope it helps you, as well as having several entries and generate different index patterns.

Step 1. Configure the inputs

Configure the fortinet and Cloudwatch inputs, in the filebeat.yml file, in the Inputs section, you should see the list of modules, you have to activate for this example the filebeat fortinet module as follows:

filebeat modules list
filebeat modules enable fortinet

And configure the two different inputs as follows:

# ============================== filebeat inputs ===============================

filebeat.inputs:
- type: http_endpoint
  enabled: true
  listen_address: 192.168.1.1
  listen_port: 8080
  response_code: 200
  response_body: '{"message": "success"}'
  url: "/"
  prefix: "json"
  tags: ["cloudguard"]

- type: aws-cloudwatch
  enabled: true
  access_key_id: '${AWS_ACCESS_KEY_ID}'
  secret_access_key: '${AWS_SECRET_ACCESS_KEY}'
  log_group_arn: arn:aws:logs:us-east-1:428152502467:log-group:test:*
  start_position: end
  fields:
    source: aws_cloudwatch

# ============================== Filebeat modules ==============================

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false

Step 2. configure templates and index

In the template section as there are two index entries it is necessary to configure the following structure:


# ======================= Elasticsearch template setting =======================

setup.template.settings:
  index.number_of_shards: 1
setup.ilm.enabled: false
output.elasticsearch:
  indices:
    - index: "logs-awspostgressgressql-default"
      when.contains:
        "aws_cloudwatch" fields.source.
      setup.template:
        name: "logs-awspostgresql-default"
        pattern: "logs-awspostgressgresql-default"

    - index: "logs-fortinet-default"
      when.equals:
        "event.module: "fortinet"
      setup.template:
        name: "logs-fortinet-default"
        pattern: "logs-fortinet-default".

when.equals is used to compare whether the value of a field is equal to a specific value, while when.contains is used to search whether a specific string is contained in the specified field value.

In summary, , the when.equals condition is used to filter events based on the exact value of a given field (as in the example of event.module: "fortinet"), while the when.contains condition is used to filter events based on whether a specific text string is in the value of the field (as in the example of fields.source: "aws_cloudwatch"). These conditions can be used in Filebeat to filter events according to different criteria.

Step 3. Output configuration

As in this example the logs will be sent to a cloud instance of elasticSearch, it is configured as follows, in the elastic Cloud section:

# =============================== Elastic Cloud ================================
cloud.id: "staging:dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRjZWM2ZjI2MWE3NGJmMjRjZTMzYmI4ODExYjg0Mjk0ZiRjNmMyY2E2ZDA0MjI0OWFmMGNjN2Q3YTllOTYNTyc0Mw=="
cloud.auth: "elastic:YOUR_PASSWORD"

Step 4. filebeat "Grok" configuration

For the grok configuration, in filebeat.yml, the processors section is used. Processors can perform different actions, such as adding, deleting or modifying fields, renaming events, transforming log event content and much more.

The processors are very useful for cleaning and normalizing log events before they are stored or sent to other systems. They have the Dissect function when you have a structured and consistent log format and want to extract specific information from the fields, while the Grok function is used when you have a less structured format and want to identify complex patterns in the logs.

# ================================= Processors =================================
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

  - if:
      equals:
        input.type: aws-cloudwatch
    then:
      - dissect:
          tokenizer: "%{awspostgresql.log.timestamp} UTC:%{awspostgresql.log.client_addr}(%{awspostgresql.log.core_id}):%{awspostgresql.log.user}@%{awspostgresql.log.database}:[%{awspostgresql.log.session_id}]:%{awspostgresql.log.level}:%{},WRITE,%{awspostgresql.log.command_tag},,,%{awspostgresql.log.query_name}"
          field: "message"
          target_prefix: ""

      - dissect:
          tokenizer: "%{awspostgresql.log.timestamp} UTC:%{awspostgresql.log.client_addr}(%{awspostgresql.log.core_id}):%{awspostgresql.log.user}@%{awspostgresql.log.database}:[%{awspostgresql.log.session_id}]:%{awspostgresql.log.level}:%{},DDL,%{awspostgresql.log.command_tag},,,%{awspostgresql.log.query_name}"
          field: "message"
          target_prefix: ""

      - dissect:
          tokenizer: "%{awspostgresql.log.timestamp} UTC:%{awspostgresql.log.client_addr}(%{awspostgresql.log.core_id}):%{awspostgresql.log.user}@%{awspostgresql.log.database}:[%{awspostgresql.log.session_id}]:%{awspostgresql.log.level}:  %{awspostgresql.log.detail}"
          field: "message"
          target_prefix: ""

      - dissect:
          tokenizer: "%{awspostgresql.log.timestamp} UTC:%{awspostgresql.log.client_addr}(%{awspostgresql.log.core_id}):%{awspostgresql.log.user}@%{awspostgresql.log.database}:[%{awspostgresql.log.session_id}]:%{awspostgresql.log.level}:  %{awspostgresql.log.query_name}"
          field: "message"
          target_prefix: ""

      - timestamp:
          field: event.ingested
          target_field: "@timestamp"
          layouts:
            - '2006-01-02T15:04:05Z'
            - '2006-01-02T15:04:05.999Z'
            - '2006-01-02T15:04:05.999-07:00'
          test:
            - '2019-06-22T16:33:51Z'
            - '2019-11-18T04:59:51.123Z'
            - '2020-08-03T07:10:20.123456+02:00'

  - drop_fields:
      fields: ["event.ingested","event.original","event.agent_id_status","host","mac","hostname","architecture","awscloudwatch.ingestion_time","log.flags","agent.name"]
      ignore_missing: true

More information at:

I hope it helps.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.