Hi all!
I have a question about implementing JSON log parsing to separated fields in Kibana.
I use fluend to get logs from my k8s cluster.
See the config:
containers.input.conf: |-
<source>
@id fluentd-containers.log
@type tail
path /var/log/containers/*.log
pos_file /var/log/es-containers.log.pos
tag raw.kubernetes.*
read_from_head true
<parse>
@type multi_format
<pattern>
format json
time_key time
time_format %Y-%m-%dT%H:%M:%S.%NZ
</pattern>
<pattern>
format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/
time_format %Y-%m-%dT%H:%M:%S.%N%:z
</pattern>
</parse>
</source>
# Detect exceptions in the log output and forward them as one log entry.
<match raw.kubernetes.**>
@id raw.kubernetes
@type detect_exceptions
remove_tag_prefix raw
message log
stream stream
multiline_flush_interval 5
max_bytes 500000
max_lines 1000
</match>
system.input.conf: |-
<source>
@id systemd.log
@type systemd
tag systemd
read_from_head true
<storage>
@type local
persistent true
path /var/log/systemd.log.pos
</storage>
<entry>
field_map {"MESSAGE": "log", "_PID": ["process", "pid"], "_CMDLINE": "process", "_COMM": "cmd"}
field_map_strict false
fields_strip_underscores true
fields_lowercase true
</entry>
</source>
# Example:
# I1118 21:26:53.975789 6 proxier.go:1096] Port "nodePort for kube-system/default-http-backend:http" (:31429/tcp) was open before and is still needed
<source>
@id kube-proxy.log
@type tail
format multiline
multiline_flush_interval 5s
format_firstline /^\w\d{4}/
format1 /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<log>.*)/
time_format %m%d %H:%M:%S.%N
path /var/log/kube-proxy.log
pos_file /var/log/es-kube-proxy.log.pos
tag kubeproxy
read_from_head true
</source>
output.conf: |-
<filter kubeproxy>
@type record_transformer
enable_ruby
<record>
hostname ${ENV["HOSTNAME"]}
</record>
</filter>
<filter **>
@type prometheus
<metric>
type counter
name fluentd_input_status_num_records_total
desc Total number of log entries generated by either application containers or system components
</metric>
</filter>
<filter kubernetes.**>
@type kubernetes_metadata
</filter>
<match **>
@id elasticsearch
@type elasticsearch
@log_level info
include_tag_key true
host ${HOST}
port 9200
scheme https
ssl_verify false
ssl_version TLSv1_2
user ${USER}
password ${PASSWORD}
logstash_format true
logstash_prefix ${INDEX_NAME}
<buffer>
@type file
path /var/log/fluentd-buffers/kubernetes.system.buffer
flush_mode interval
retry_type exponential_backoff
flush_thread_count 2
flush_interval 5s
retry_forever
retry_max_interval 30
chunk_limit_size 2M
queue_limit_length 8
overflow_action block
</buffer>
</match>
How I receive all logs and JSON logs at the current time (see the highlighted fields within the JSON log string - these need to be parsed out into individual fields in the index):
How it should be:
But I also have non a JSON type logs in this index and nothing should be broken.
I know that I can use Ingest Pipeline for filebeat, but in my case, I need to create something the same for fluentd.
Does anyone have ideas?
I will be glad for any help from you!