Hello community,
Having encountered the problem of how to apply groks in filebeat, I want to share with you the solution I found with the PROCESSORS section and the Dissect function, I hope it helps you, as well as having several entries and generate different index patterns.
Step 1. Configure the inputs
Configure the fortinet and Cloudwatch inputs, in the filebeat.yml file, in the Inputs section, you should see the list of modules, you have to activate for this example the filebeat fortinet module as follows:
filebeat modules list
filebeat modules enable fortinet
And configure the two different inputs as follows:
# ============================== filebeat inputs ===============================
filebeat.inputs:
- type: http_endpoint
enabled: true
listen_address: 192.168.1.1
listen_port: 8080
response_code: 200
response_body: '{"message": "success"}'
url: "/"
prefix: "json"
tags: ["cloudguard"]
- type: aws-cloudwatch
enabled: true
access_key_id: '${AWS_ACCESS_KEY_ID}'
secret_access_key: '${AWS_SECRET_ACCESS_KEY}'
log_group_arn: arn:aws:logs:us-east-1:428152502467:log-group:test:*
start_position: end
fields:
source: aws_cloudwatch
# ============================== Filebeat modules ==============================
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
Step 2. configure templates and index
In the template section as there are two index entries it is necessary to configure the following structure:
# ======================= Elasticsearch template setting =======================
setup.template.settings:
index.number_of_shards: 1
setup.ilm.enabled: false
output.elasticsearch:
indices:
- index: "logs-awspostgressgressql-default"
when.contains:
"aws_cloudwatch" fields.source.
setup.template:
name: "logs-awspostgresql-default"
pattern: "logs-awspostgressgresql-default"
- index: "logs-fortinet-default"
when.equals:
"event.module: "fortinet"
setup.template:
name: "logs-fortinet-default"
pattern: "logs-fortinet-default".
when.equals is used to compare whether the value of a field is equal to a specific value, while when.contains is used to search whether a specific string is contained in the specified field value.
In summary, , the when.equals condition is used to filter events based on the exact value of a given field (as in the example of event.module: "fortinet"), while the when.contains condition is used to filter events based on whether a specific text string is in the value of the field (as in the example of fields.source: "aws_cloudwatch"). These conditions can be used in Filebeat to filter events according to different criteria.
Step 3. Output configuration
As in this example the logs will be sent to a cloud instance of elasticSearch, it is configured as follows, in the elastic Cloud section:
# =============================== Elastic Cloud ================================
cloud.id: "staging:dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRjZWM2ZjI2MWE3NGJmMjRjZTMzYmI4ODExYjg0Mjk0ZiRjNmMyY2E2ZDA0MjI0OWFmMGNjN2Q3YTllOTYNTyc0Mw=="
cloud.auth: "elastic:YOUR_PASSWORD"
Step 4. filebeat "Grok" configuration
For the grok configuration, in filebeat.yml, the processors section is used. Processors can perform different actions, such as adding, deleting or modifying fields, renaming events, transforming log event content and much more.
The processors are very useful for cleaning and normalizing log events before they are stored or sent to other systems. They have the Dissect function when you have a structured and consistent log format and want to extract specific information from the fields, while the Grok function is used when you have a less structured format and want to identify complex patterns in the logs.
# ================================= Processors =================================
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
- if:
equals:
input.type: aws-cloudwatch
then:
- dissect:
tokenizer: "%{awspostgresql.log.timestamp} UTC:%{awspostgresql.log.client_addr}(%{awspostgresql.log.core_id}):%{awspostgresql.log.user}@%{awspostgresql.log.database}:[%{awspostgresql.log.session_id}]:%{awspostgresql.log.level}:%{},WRITE,%{awspostgresql.log.command_tag},,,%{awspostgresql.log.query_name}"
field: "message"
target_prefix: ""
- dissect:
tokenizer: "%{awspostgresql.log.timestamp} UTC:%{awspostgresql.log.client_addr}(%{awspostgresql.log.core_id}):%{awspostgresql.log.user}@%{awspostgresql.log.database}:[%{awspostgresql.log.session_id}]:%{awspostgresql.log.level}:%{},DDL,%{awspostgresql.log.command_tag},,,%{awspostgresql.log.query_name}"
field: "message"
target_prefix: ""
- dissect:
tokenizer: "%{awspostgresql.log.timestamp} UTC:%{awspostgresql.log.client_addr}(%{awspostgresql.log.core_id}):%{awspostgresql.log.user}@%{awspostgresql.log.database}:[%{awspostgresql.log.session_id}]:%{awspostgresql.log.level}: %{awspostgresql.log.detail}"
field: "message"
target_prefix: ""
- dissect:
tokenizer: "%{awspostgresql.log.timestamp} UTC:%{awspostgresql.log.client_addr}(%{awspostgresql.log.core_id}):%{awspostgresql.log.user}@%{awspostgresql.log.database}:[%{awspostgresql.log.session_id}]:%{awspostgresql.log.level}: %{awspostgresql.log.query_name}"
field: "message"
target_prefix: ""
- timestamp:
field: event.ingested
target_field: "@timestamp"
layouts:
- '2006-01-02T15:04:05Z'
- '2006-01-02T15:04:05.999Z'
- '2006-01-02T15:04:05.999-07:00'
test:
- '2019-06-22T16:33:51Z'
- '2019-11-18T04:59:51.123Z'
- '2020-08-03T07:10:20.123456+02:00'
- drop_fields:
fields: ["event.ingested","event.original","event.agent_id_status","host","mac","hostname","architecture","awscloudwatch.ingestion_time","log.flags","agent.name"]
ignore_missing: true
More information at:
I hope it helps.