Incorrect JSON logs parsing

Hi. I have json format logs, looks like this:

{
	"dt":"2023-01-18T17:41:04.8723262+00:00",
	"tz":"Etc/UTC",
	"host":"host-name",
	"containerName":"container-name",
	"level":"INFO",
	"scope":"Web API",
	"message":"exiting web api method",
	"callerMemberName":"OnResultExecuted",
	"callerFilePath":"/path",
	"callerLineNumber":94,
	"context":
		{
			"StatusCode":200,
			"ControllerName":"name",
			"ActionName":"InsertAsync"
		},
	"exception":null,
	"executionId":"1234567890abcdefghig",
	"traceId":"1234567890abcdefghig"
}

In Kibana it's looks like this:

And a "decode_json_fields" processor in filebeat.yml file:

filebeat.inputs:
- type: container
  paths:
  - /var/log/containers/*.log
  include_lines: ['DEBUG', 'INFO', 'ERROR', 'WARN', 'WARNING', 'FATAL', 'CRITICAL']
  #exclude_files: ['.gz$']
  multiline.pattern: ^\d
  multiline.negate: true
  multiline.match: after
  processors:
  - add_kubernetes_metadata:
      host: ${NODE_NAME}
      matchers:
      - logs_path:
          logs_path: "/var/log/containers/"
  - decode_json_fields:
      fields: ["level"]
      target: ""

So I want to get only "level" field so that later I can display it in kibana.
Now kibana displays incorrect log level (I know that such a displaying indicates an error in the parser configuration):
image

Or maybe someone can advice an approach to parse all json data in many different individual fields in kibana. That would be good for me too.
Thanks

I'd suggest to send the message as it is from filebeat and configure an ingest pipeline that process your data to your requirements. So you can keep all the ingest logic close to your cluster.

Taking your input, extracting the level key from the message in this simulated query is quite straight forward using the json processor:

POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "_description",
    "processors": [
      {
        "json": {
          "field": "message",
          "add_to_root": true
        }
      },
      {
        "remove": {
          "field": [
            "callerMemberName", "exception",
            "traceId", "tz",
            "callerFilePath", "message",
            "callerLineNumber", "dt",
            "executionId", "containerName",
            "scope", "host", "context"
          ]
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "message": """{
      	"dt":"2023-01-18T17:41:04.8723262+00:00",
      	"tz":"Etc/UTC",
      	"host":"host-name",
      	"containerName":"container-name",
      	"level":"INFO",
      	"scope":"Web API",
      	"message":"exiting web api method",
      	"callerMemberName":"OnResultExecuted",
      	"callerFilePath":"/path",
      	"callerLineNumber":94,
      	"context":
      		{
      			"StatusCode":200,
      			"ControllerName":"name",
      			"ActionName":"InsertAsync"
      		},
      	"exception":null,
      	"executionId":"1234567890abcdefghig",
      	"traceId":"1234567890abcdefghig"
      }"""
      }
    }
  ]
}

The simulated output of that pipeline is:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_id": "id",
        "_version": "-3",
        "_source": {
          "level": "INFO"
        },
        "_ingest": {
          "timestamp": "2023-01-19T15:42:59.808712161Z"
        }
      }
    }
  ]
}

Of course you can keep any other fields taking them out of the remove processor, and you should first define the mapping of your data, preferably with an index template.

1 Like