Processing a json file in filebeat

Hi there!,
I got a filebeat config (see further below) that is currently working, and Its supposed to read a log file written in JSON and then send it, in this case to a kafka topic. The the log message is stored in under a json key named 'json'

Let's say, in example that my a log entry is like:

{"@timestamp": "2020-10-08T12:26:30+0000", "level": "INFO", "message": "takes the value and converts it to string."}

So after filebeat read it, It would be stored in something like:

      "fields" : {
        "environment" : "int"
      "@version" : "1",
      "@timestamp" : "2020-10-08T10:17:33.961Z",
      "ecs" : {
        "version" : "1.5.0"
      "input" : {
        "type" : "log"
      "log" : {
        "file" : {
          "path" : "/var/log/logs-app/access.log"
        "offset" : 17599
      "host" : {
        "name" : "log-app"
      "json" : {
        "message" : "takes the value and converts it to string.",
        "@timestamp" : "2020-10-08T10:17:33+0000",
        "level" : "INFO"

Is there a way to unwrap the key json into the root document? I saw that in filebeat there there is a json procesor but I couldn't find a way to do it.
Same thing happens to me when I add the environment variable CONTEXT_ENVIRONMENT it creates an object named fields, Is there a way to merge it to an existing one?

The following is the config for filebeat

kind: ConfigMap
  name: filebeat-config
  namespace: default
    k8s-app: filebeat "true"
  filebeat.yml: |-
    - type: log
      enabled: true
        - ${APP_LOG}
        environment: ${CONTEXT_ENVIRONMENT}
      json_keys_under_root: true
      json.add_error_key: true
    - type: log
      enabled: true
        - ${ACCESS_LOG}
        environment: ${CONTEXT_ENVIRONMENT}
      json_keys_under_root: true
      json.add_error_key: true
      path: ${path.config}/modules.d/*.yml
      reload.enabled: false
      enabled: true
      hosts:  '${KAFKA_URL}'
      topic: '${KAFKA_TOPIC}'
        reachable_only: false
      compression: gzip
    logging.level: warning
    logging.selectors: ["*"]

I don't know if I can do this config via filebeat. Currently Im trying to do it with logstash

  kafka-appender.conf: |
    input {
        group_id => "logstash"
        topics => ["test"]
        bootstrap_servers => "kafka.default.svc.cluster.local:9092"
        codec => json
    filter {
      json {
        source => "json"
        target => "app-trace"
    output {
      elasticsearch {
        index => "logstash-kafka-%{+YYYY.MM}"
        hosts => [ "${ES_HOSTS}" ]
        user => "${ES_USER}"
        password => "${ES_PASSWORD}"
        cacert => '/etc/logstash/certificates/ca.crt'

I also tryied with an empty target (that is what I really want to)

The thing now is that Im having an error at the filter level

[WARN ] 2020-10-08 14:50:11.050 [[kafka]>worker0] json - Error parsing json {:source=>"json", :raw=>{"message"=>"variable not in use.", "level"=>"WARN", "@timestamp"=>"2020-10-08T14:50:08+0000"}, :exception=>java.lang.ClassCastException: class org.jruby.RubyHash cannot be cast to class org.jruby.RubyIO (org.jruby.RubyHash and org.jruby.RubyIO are in unnamed module of loader 'app')}

Pls guys, could you lend me a hand?

I found a way but I guess it might be a simpler way

filter {
  json_encode {
    source => "[json]"
    target => "[trace]"
  json {
    source => "[trace]"

The json processor did not worked because the object named json is already processed, so I re-encode it with the json_encode and then de-code it with the json processor, but its awful.

Is there a more elegant way to do it? I dont care if its with logstash o filebeat whoever resolve my problem in a more efficient way.

Got a typo in the config json_keys_under_root: true It should be json.keys_under_root

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.