Detecting JSON in Ingest pipeline


(Eugene Bolshakoff) #1

Hi all!

I have the following pipeline:
{
"description" : "Ingest pipeline",
"processors" : [
{
"grok": {
"field": "message",
"patterns": ["%{TIMESTAMP_ISO8601:logtime} %{WORD:loglevel} - %{GREEDYDATA:logdata}"]
}
},
{
"json": {
"field": "logdata",
"add_to_root": true
}
},
{
"remove": {
"field": ["message","logdata"]
}
}
]
}'

The log line looks like this (it's got from our app, then processed by Filebeat):
2018-10-08 13:39:36,247 INFO - {"a": "b"....}

But, sometimes our app throws loglines which aren't JSON:
2018-10-08 08:54:18,592 INFO - user 10470684 cannot assume zuid 10470684

In this case pipeline fails, but if this log line doesn't appear in EFK, this is not the problem. The problem is that ingesting is completely stopped, and I need to restart Filebeat.

Now I skip that lines in Filebeat config, but I can't figure out if I add all problematic lines to its config. I would like to:

  • just skip line if it's not JSON
  • or (better but not mandatory), add it as-is (like "message": "my logline"), if it's not JSON

(Jake Landis) #2

You should be able to use the on_failure handler to accomplish this. https://www.elastic.co/guide/en/elasticsearch/reference/current/handling-failure-in-pipelines.html

For example:

PUT _ingest/pipeline/test1
{
  "description": "Ingest pipeline",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{TIMESTAMP_ISO8601:logtime} %{WORD:loglevel} - %{GREEDYDATA:logdata}"
        ]
      }
    },
    {
      "json": {
        "field": "logdata",
        "add_to_root": true,
        "on_failure": [
          {
            "grok": {
              "field": "message",
              "patterns": [
                "%{TIMESTAMP_ISO8601:logtime} %{WORD:loglevel} - %{GREEDYDATA:log_line}"
              ]
            }
          }
        ]
      }
    },
     {
      "remove": {
        "field": [
          "message",
          "logdata"
        ]
      }
    }
  ]
}

POST test/_doc/1?pipeline=test1
{
  "message": """2018-10-08 13:39:36,247 INFO - {"a": "b"}"""
}

POST test/_doc/2?pipeline=test1
{
  "message": "2018-10-08 08:54:18,592 INFO - user 10470684 cannot assume zuid 10470684"
}

GET test/_doc/1
GET test/_doc/2

Doc 1 get parsed as JSON and
Doc 2 handles the JSON failure and results in

{
  "_index": "test",
  "_type": "_doc",
  "_id": "2",
  "_version": 5,
  "found": true,
  "_source": {
    "loglevel": "INFO",
    "log_line": "user 10470684 cannot assume zuid 10470684",
    "logtime": "2018-10-08 08:54:18,592"
  }
}

(system) #3

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.