I need to add one custom field 'log.level' into filebeat.yml. ITs value needs to be derived from one of source field 'message'. I need to extract log level (INFO or DEBUG or ERROR etc.) from message. Message field looks like below-
message [2021-05-04 14:57:22,588] INFO [SocketServer brokerId=1001] Failed authentication with /10.130.110.75 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
@Badger is correct. Create a dissect processor to parse the message before it is sent to ES or you can use a grok processor in logstash or ES pipelines if you need regex to better parse.
I did check the dissect processor but for some reason, not able to figure out the tokenizer just to extract log.level value out of message field. Let me take deeper look into dissect processors.
Below is my filebeat.yml. does my dissect processors look correct? i am keep getting error -2021-05-05T14:30:22.144-0700 INFO instance/beat.go:456 filebeat stopped.
2021-05-05T14:30:22.144-0700 ERROR instance/beat.go:951 Exiting: Failed to start crawler: starting input failed: Error while initializing input: can not convert 'string' into 'object' accessing 'filebeat.inputs.0.processors.0.dissect' (source:'/etc/filebeat/filebeat.yml')
Exiting: Failed to start crawler: starting input failed: Error while initializing input: can not convert 'string' into 'object' accessing 'filebeat.inputs.0.processors.0.dissect' (source:'/etc/filebeat/filebeat.yml')
It doesnot clearly say what's the problem.
###################### Filebeat Configuration #######################
filebeat.inputs:
- type: log
enabled: true
#Do not move top_log_path variable to next line, it will mess up with yaml formatting and filebeat service will not start.
paths: <%=@top_log_path %>
fields_under_root: true
processors:
- dissect:
tokenizer:[%{?messageTime}] %{log.level}
field:_source.message
target_prefix:""
overwrite_keys:true
ignore_failure:true
# You must provide a regex multiline.pattern that identifies the start of a new log event
multiline.pattern: '^\[?\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d+\]?'
multiline.negate: true
multiline.match: after
ignore_older: 48h
close_inactive: 32h
backoff: 1ms
# The following configuration works well for log file rotations that happen at midnight
# Here we configure the filebeat harvester to look for new files 5 minutes past midnight local time
# and scan thereafter every 24 hours. If you don't use scan_offset then you will have to scan so
# frequently in order to pick up new files, but at the cost off fruitless scanning and causing iops
# performance hits that will impact your application..
#scan_frequency: 24h
#scan_offset: 0h5m
#============================= Filebeat modules ===============================
filebeat.config.modules:
# Glob pattern for configuration loading
path: ${path.config}/modules.d/*.yml
# Set to true to enable config reloading
reload.enabled: false
# Period on which files under path should be checked for changes
#reload.period: 10s
#==================== Elasticsearch template setting ==========================
#setup.template.settings:
#index.number_of_shards: 1
#index.codec: best_compression
#_source.enabled: false
#================================ General =====================================
# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:
# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]
# Optional fields that you can specify to add additional information to the
# output.
fields:
environment.name: "<%= @top_kafka_environment%>"
# name: edm-logs
# type: edm
fields_under_root: true
#============================== Kibana =====================================
#setup.kibana:
#host: "https://kibana.main.dev.top.rd.elliemae.io"
#space.id: "sandbox"
#================================ Outputs =====================================
#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
# Array of hosts to connect to.
#hosts: ["localhost:9200"]
# Optional protocol and basic auth credentials.
#protocol: "https"
#username: "elastic"
#password: "changeme"
#
#----------------------------- Logstash output --------------------------------
output.logstash:
# The Logstash hosts
hosts: ["beats.intake.<%= @top_kibana_environment%>.top.elliemae.io:443"]
bulk_max_size: 2048
index: "<%= @top_kibana_index_name%>"
ssl.verification_mode: none
enabled: true
ssl.enabled: true
pipelining: 0
ttl: 120
backoff.init: 1s
backoff.max: 60s
max_retries: 10
timeout: 30s
compression_level: 5
loadbalance: true
#================================ Logging =====================================
# Sets log level. The default log level is info.
# Available log levels are: error, warning, info, debug
logging.level: info
logging.to_files: true
logging.to_syslog: false
logging.files:
path: "/var/log/filebeat"
name: "filebeat"
keepfiles: 7
permissions: 0644
# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
logging.selectors: ["*"]
#================================= Migration ==================================
# This allows to enable 6.7 migration aliases
#migration.6_to_7.enabled: true
Got it working. Finally used dissect processors. The problem was yaml indentation. validated my yaml using - http://www.yamllint.com/
Below is my filebeat.yml. It can be helpful to future readers.
[root@ep3vebkfk100014 CLOUD\anadkarni]# cat /etc/filebeat/filebeat.yml
###################### Filebeat Configuration #######################
filebeat.inputs:
- type: log
enabled: true
#Do not move top_log_path variable to next line, it will mess up with yaml formatting and filebeat service will not start.
paths:
- "/var/log/kafka/kafka.log"
fields_under_root: true
processors:
- dissect:
tokenizer: '[%{?message.time}] %{log.level} %{?discard.this}'
field: message
target_prefix: ""
overwrite_keys: true
# You must provide a regex multiline.pattern that identifies the start of a new log event
multiline.pattern: '^\[?\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d+\]?'
multiline.negate: true
multiline.match: after
ignore_older: 48h
close_inactive: 32h
backoff: 1ms
# The following configuration works well for log file rotations that happen at midnight
# Here we configure the filebeat harvester to look for new files 5 minutes past midnight local time
# and scan thereafter every 24 hours. If you don't use scan_offset then you will have to scan so
# frequently in order to pick up new files, but at the cost off fruitless scanning and causing iops
# performance hits that will impact your application..
#scan_frequency: 24h
#scan_offset: 0h5m
#============================= Filebeat modules ===============================
filebeat.config.modules:
# Glob pattern for configuration loading
path: ${path.config}/modules.d/*.yml
# Set to true to enable config reloading
reload.enabled: false
# Period on which files under path should be checked for changes
#reload.period: 10s
#==================== Elasticsearch template setting ==========================
#setup.template.settings:
#index.number_of_shards: 1
#index.codec: best_compression
#_source.enabled: false
#================================ General =====================================
# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:
# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]
# Optional fields that you can specify to add additional information to the
# output.
fields:
environment.name: "dev"
# name: edm-logs
# type: edm
fields_under_root: true
#============================== Kibana =====================================
#setup.kibana:
#host: "https://kibana.main.dev.top.rd.elliemae.io"
#space.id: "sandbox"
#================================ Outputs =====================================
#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
# Array of hosts to connect to.
#hosts: ["localhost:9200"]
# Optional protocol and basic auth credentials.
#protocol: "https"
#username: "elastic"
#password: "changeme"
#
#----------------------------- Logstash output --------------------------------
output.logstash:
# The Logstash hosts
hosts: ["beats.intake.nonprod.top.elliemae.io:443"]
bulk_max_size: 2048
index: "kafka-broker-ch3"
ssl.verification_mode: none
enabled: true
ssl.enabled: true
pipelining: 0
ttl: 120
backoff.init: 1s
backoff.max: 60s
max_retries: 10
timeout: 30s
compression_level: 5
loadbalance: true
#================================ Logging =====================================
# Sets log level. The default log level is info.
# Available log levels are: error, warning, info, debug
logging.level: info
logging.to_files: true
logging.to_syslog: false
logging.files:
path: "/var/log/filebeat"
name: "filebeat"
keepfiles: 7
permissions: 0644
# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
logging.selectors: ["*"]
#================================= Migration ==================================
# This allows to enable 6.7 migration aliases
#migration.6_to_7.enabled: true
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.