Hi there!,
I got a filebeat config (see further below) that is currently working, and Its supposed to read a log file written in JSON and then send it, in this case to a kafka topic. The the log message is stored in under a json key named 'json'
Let's say, in example that my a log entry is like:
{"@timestamp": "2020-10-08T12:26:30+0000", "level": "INFO", "message": "takes the value and converts it to string."}
So after filebeat read it, It would be stored in something like:
....
"fields" : {
"environment" : "int"
},
"@version" : "1",
"@timestamp" : "2020-10-08T10:17:33.961Z",
"ecs" : {
"version" : "1.5.0"
},
"input" : {
"type" : "log"
},
"log" : {
"file" : {
"path" : "/var/log/logs-app/access.log"
},
"offset" : 17599
},
"host" : {
"name" : "log-app"
},
"json" : {
"message" : "takes the value and converts it to string.",
"@timestamp" : "2020-10-08T10:17:33+0000",
"level" : "INFO"
}
}
Is there a way to unwrap the key json into the root document? I saw that in filebeat there there is a json procesor but I couldn't find a way to do it.
Same thing happens to me when I add the environment variable CONTEXT_ENVIRONMENT it creates an object named fields, Is there a way to merge it to an existing one?
The following is the config for filebeat
kind: ConfigMap
metadata:
name: filebeat-config
namespace: default
labels:
k8s-app: filebeat
kubernetes.io/cluster-service: "true"
data:
filebeat.yml: |-
filebeat.inputs:
- type: log
enabled: true
paths:
- ${APP_LOG}
fields:
environment: ${CONTEXT_ENVIRONMENT}
json_keys_under_root: true
json.add_error_key: true
- type: log
enabled: true
paths:
- ${ACCESS_LOG}
fields:
environment: ${CONTEXT_ENVIRONMENT}
json_keys_under_root: true
json.add_error_key: true
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
output.kafka:
enabled: true
hosts: '${KAFKA_URL}'
topic: '${KAFKA_TOPIC}'
partition.round_robin:
reachable_only: false
compression: gzip
logging.level: warning
logging.selectors: ["*"]
I don't know if I can do this config via filebeat. Currently Im trying to do it with logstash
kafka-appender.conf: |
input {
kafka{
group_id => "logstash"
topics => ["test"]
bootstrap_servers => "kafka.default.svc.cluster.local:9092"
codec => json
}
}
filter {
json {
source => "json"
target => "app-trace"
}
}
output {
elasticsearch {
index => "logstash-kafka-%{+YYYY.MM}"
hosts => [ "${ES_HOSTS}" ]
user => "${ES_USER}"
password => "${ES_PASSWORD}"
cacert => '/etc/logstash/certificates/ca.crt'
}
}
I also tryied with an empty target (that is what I really want to)
The thing now is that Im having an error at the filter level
[WARN ] 2020-10-08 14:50:11.050 [[kafka]>worker0] json - Error parsing json {:source=>"json", :raw=>{"message"=>"variable not in use.", "level"=>"WARN", "@timestamp"=>"2020-10-08T14:50:08+0000"}, :exception=>java.lang.ClassCastException: class org.jruby.RubyHash cannot be cast to class org.jruby.RubyIO (org.jruby.RubyHash and org.jruby.RubyIO are in unnamed module of loader 'app')}