Need for `on_success` with pipelines

In studying https://www.elastic.co/guide/en/elasticsearch/reference/current/handling-failure-in-pipelines.html, I see that "a pipeline defines a list of processors that are executed sequentially, and processing halts at the first exception". This appears to be a hard stop and the stream stops. This behavior

Consider this pipeline:

PUT _ingest/pipeline/rename-log4j-timestamp
{
  "description": "rename the timestamp field in a log4j2 row",
  "processors": [
    {
      "convert": {
        "field": "timeMillis",
        "type": "string",
        "ignore_failure" : true
      }
    },
    {
      "date": {
        "field": "timeMillis",
        "formats": [
          "UNIX_MS"
        ],
        "ignore_failure" : true
      }
    }
  ]
}

If the pipeline gets a row that fits the schema, everything is great, but if there is a problem with a row of the pipeline, the pipeline appears to stall. The semantics of this do not seem to be documented very well, but it is convenient that no data seems to be lost until the pipeline is fixed. Do I understand this correctly?

My bigger problem is how to handle data that does not have timeMillis as a field. In this case, I just want to ignore the record (in my case, accepting the @timestamp that is already there). The definition I cited above states that the pipeline halts at the first exception. My preference is that the pipeline would be ignored if there is a failure anywhere in it.

In my example here, I added "ignore_failure" to each element. That works, but in turn, every pipeline element continues to be evaluated. What if continued processing after a failed pipeline processor would be harmful? In other words, if later stages depend on previous stages, there seems to be no way to gracefully exit the pipeline early and leave the record unmodified, but without halting the pipeline altogether.

One solution would be an on_success element that complimented the on_failure. In that case, pipeline elements could cascade. A second solution would be that the pipeline definition itself would have a standard behavior. In that case, the ignore_failure could be removed from each processor. This should lead to more robust code, especially in large pipelines that are maintained by several developers.

Thoughts? I'm new to pipelines and not sure if I am looking at them right.

Thanks! Brian

You can define a failure pipeline either on the full pipeline or on a processor level.

Would that work for you?

Thanks David. Apologies for the lack of experience on this end, would the failure pipeline be able to be more of an "ignore" pipeline?

If the failure pipeline stopped future processor execution within the current pipeline and allowed the record to pass unmodified, that would definitely cover my goals.

I hadn't thought of what amounts to a "null failure pipeline", it's a bit like using exception processing paths for non-exceptional circumstances, generally considered an anti-pattern. But this isn't standard programming either. I just want to make sure I don't hand off something embarrassing or brittle to the next guy and establish good patterns as I learn.

Thanks!

@dadoonet: I gave it a try and I get the error pipeline [p] cannot have an empty on_failure option defined. I don't see a processor that would act as a "null processor".

Interesting.

A workaround is to add a set processor followed by a remove processor to unset what you set...

May be we need a noop processor? Or support empty on failure pipeline like the normal pipeline does. @talevy WDYT?

Sounds reasonable, I suppose this was not thought out in such a way at the time. I am for updating the Pipeline to allow empty on_failure blocks

in the meantime, you can leverage the script processor to do a noop

here is an example:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "noop pipeline-level on_failure",
    "processors": [
      {
        "convert": {
          "field": "might_exist",
          "type": "string"
        }
      },
      {
        "set": {
          "field": "it_does_exist!",
          "value": "wahoo!"
        }
      }
    ],
    "on_failure": [{"script": {"inline": "'noop'"}}]
  },
  "docs": [
    {"_source": { "might_exist": 1} },
    {"_source": { } }
  ]
}

hope that helps!

Awesome, thanks guys! Grateful for the feedback. Having a ton of fun with Elastic, great work!

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.