Hi guys,
Please assist if you have a spare minute or two.
The logging stack is as follow:
ES v.6.3.1 as a service on AWS.
Fluentd version is 1.3.3
Grok plugin as described here: https://github.com/fluent/fluent-plugin-grok-parser
Fluentd is being run as a pod on each of the nodes of K8S cluster.
It aggregates data using GROK and sends it to AWS ES.
The issue is that all the fields despite the format E.G.: [NUMBER:field_name:integer]
created inside of ES as string
. Even though there is no fluentd's
errors or warning. All data is written etc. But with wrong filed type. Only string
.
We create an index once per day. All the fields and their names are the same in each new and old index.
Fluentd's input config:
<source>
@id fluentd-containers.log
@type tail
path /var/log/containers/*.log
pos_file /var/log/fluentd-containers.log.pos
tag raw.kubernetes.*
read_from_head true
# GROK patterns:
<parse>
@type grok
grok_name_key grok_name
grok_failure_key grokfailure
time_format "%d/%b/%Y:%H:%M:%S %z"
<grok>
name htweb_nginx
pattern %{IP:real_ip} - .* - - \\[%{DATA:ingress_time}\\] \\\\"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|-)\\\\" %{NUMBER:response:integer} (?:%{NUMBER:bytes_sent:integer}|-) (?:\\\\"(?:%{URI:referrer}|-)\\\\") \\\\"(?:%{GREEDYDATA:user_agent}|-)\\\\" %{NUMBER:request_length:integer} (?:%{NUMBER:response_time:float}|-) \\[(?:%{DATA:upstream_proxy}|-)\\] (?:%{DATA:upstream_addr}:3000|-) (?:%{NUMBER:upstream_length:integer}|-) (?:%{NUMBER:upstream_time:float}?|-) (?:%{NUMBER:upstream_response_result:integer}?|-) %{WORD:request_id}
</grok>
<grok>
name APP_EVENT_PAGE_v5
pattern %{TIMESTAMP_ISO8601:timestamp} \\\\"%{NOTSPACE:app_host}\\\\" \\\\"%{WORD:request_id}\\\\" \\\\"%{WORD:event_type}\\\\" \\\\"PAGE\\\\" %{NUMBER:logfile_page_type_version} %{IP:clientip} %{WORD:verb} %{NUMBER:response} \\\\"%{NOTSPACE:controller}\\\\" \\\\"%{WORD:action}\\\\" \\\\"%{NOTSPACE:request}\\\\" \\\\"{\\\\"%{DATA:request_params}}\\\\" \\\\"%{NUMBER:user_id}\\\\" \\\\"%{NUMBER:admin_user_id}\\\\" \\\\"%{DATA:user_agent}\\\\" \\\\"%{DATA:referer}\\\\" %{NUMBER:view_runtime} %{NUMBER:db_runtime} %{NUMBER:solr_runtime}
</grok>
<grok>
name ht_web
pattern \\[%{DATA:request_id}\\] method=%{WORD:method} path=%{NOTSPACE:path} format=%{NOTSPACE:format} controller=%{NOTSPACE:controller} action=%{WORD:action} status=%{NUMBER:status} duration=%{NUMBER:duration} view=%{NUMBER:view} db=%{NUMBER:db}
</grok>
<grok>
name ht_web_error
pattern \\[%{DATA:request_id}\\] %{NOTSPACE:controller} \\(%{GREEDYDATA:error_message}\\)
</grok>
<grok>
name ht_web_error_trace
pattern \\[%{DATA:request_id}\\] %{GREEDYDATA:error_message} \\`%{WORD:error_type}\\'
</grok>
<grok>
name rest_message
pattern %{GREEDYDATA:log}
</grok>
</parse>
</source>
# Detect exceptions in the log output and forward them as one log entry.
<match raw.kubernetes.**>
@id raw.kubernetes
@type detect_exceptions
remove_tag_prefix raw
message log
stream stream
multiline_flush_interval 5
max_bytes 500000
max_lines 1000
</match>
Fluentd's output config:
<match **>
@id elasticsearch_other
@type elasticsearch
@log_level info
type_name fluentd
include_tag_key true
host "#{ENV['OUTPUT_HOST']}"
port "#{ENV['OUTPUT_PORT']}"
scheme https # support for AWS ES
logstash_format true
logstash_prefix prod.k8s
### AWS ElasticSearch needs this set to false. See
### https://discuss.elastic.co/t/elasitcsearch-ruby-raises-cannot-get-new-connection-from-pool-error/36252/10
reload_connections false
slow_flush_log_threshold 30s
ssl_version TLSv1_2
<buffer>
@type file
path /var/log/fluentd-buffers/kubernetes.other.system.buffer
#flush_mode immediate
flush_mode interval
retry_type exponential_backoff
flush_thread_count 10
flush_interval 5s
retry_forever
retry_max_interval 30
chunk_limit_size "#{ENV['OUTPUT_BUFFER_CHUNK_LIMIT']}"
queue_limit_length "#{ENV['OUTPUT_BUFFER_QUEUE_LIMIT']}"
overflow_action block
</buffer>
</match>
In addition. We started trying to put(change field type) as integer
and other types some time after start of logs aggregation. E.G. field have been already declared in previous indices as string
.