Pipeline processor with IF condition

Hi Team,

We want to use Conditional pipeline processor on basis of operationName in below JSON data

IF "operationName": "/searchRIBCustomerInMDM | request/response", then pipeline should execute .

{
  "_index": "jaeger-span-2020-06-23",
  "_type": "_doc",
  "_id": "OI_f4XIBs_kTi5esZ9i3",
  "_version": 1,
  "_score": null,
  "_source": {
    "traceID": "c924f366df87a9da",
    "process": {
      "tag": {
        "opencensus@language": "GO_LANG",
        "opencensus@corelibversion": "0.23.0",
        "pid": 1,
        "opencensus@exporterversion": "0.0.1",
      },
      "serviceName": "IDP-API",
      "tags": []
    },
    "references": [
      {
        "traceID": "c924f366df87a9da",
        "spanID": "ae4f4f3e25e3a7cd",
        "refType": "CHILD_OF"
      }
    ],
    "startTimeMillis": 1592927410125,
    "description": "",
    "operationName": "/searchRIBCustomerInMDM | request/response",
    "tags": [],
    "spanID": "91753f987519a6b6",
    "duration": 1900773,
    "startTime": 1592927410125181,
    "tag": {
      "http@status_code": 200,
      "internal@span@format": "jaeger"
    },
    ],
  },
  "fields": {
    "startTimeMillis": [
      "2020-06-23T15:50:10.125Z"
    ]
  },

  "sort": [
    "2020-06-23T13:28:29.398827502Z"
  ]
}

Regards,
Jalpesh

May be look at https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-conditionals.html and https://www.elastic.co/guide/en/elasticsearch/reference/current/conditionals-with-multiple-pipelines.html#conditionals-with-multiple-pipelines

Hi Dadoonet,

I have already gone through these documentations but NO proper explanation with example .

Can you help in painless language?

Regards,
Jalpesh

The second link has an example. Did you try it?

If you still can't make it work, share a full reproduction script using just the _simulate endpoint and we can hopefully start from here to help.

Hi ,

Please find below _simulate endpoint
In below example if "operationName": "searchRIBCustomerInMDM" then pipeline "name": "pipelineABIF" should execute .
Here "operationName": is inside _source object and it might be issue

POST /_ingest/pipeline/_simulate
{
  "pipeline" :
  {
    "description": "_description",
  "processors": [
    {
      "pipeline": {
        "if": "ctx.operationName?.name == 'createRIBCase'",
        "name": "pipelineABIF2"
      }
    },
    {
      "pipeline": 
      {
	     	"if": "ctx.operationName?.name == 'searchRIBCustomerInMDM'",
        "name": "pipelineABIF"
      }
    },
    {
      "fail": {
        "if": "ctx.operationName?.name != 'createRIBCase' && ctx.operationName?.name != 'searchRIBCustomerInMDM'",
        "message": "This pipeline requires operationName.name to be either `syslog` or `apache_httpd`"
      }
    }
  ]
  },
  "docs": 
  [
{
  "_index": "jaeger-span-2020-06-23",
  "_type": "_doc",
  "_id": "OI_f4XIBs_kTi5esZ9i3",
  "_version": 1,
  "_score": null,
  "_source": {
    "traceID": "c924f366df87a9da",
    "process": {
      "tag": {
        "hostname": "idp-api-5dfcb7c8d8-jmpdx"
      },
      "serviceName": "IDP-API",
      "tags": []
    },
    "references": [
      {
      }
    ],
    "startTimeMillis": 1592927410125,
    "description": "",
    "operationName": "searchRIBCustomerInMDM",
    "responseData": "{}",
    "tags": [],
    "spanID": "91753f987519a6b6",
    "duration": 1900773,
    "startTime": 1592927410125181,
    "tag": {
    },
    "requestData": "{}",
    "logs": [
      {
        "fields": [
          {
          },
          {
          }
        ],
        "timestamp": 1592927410125196
      },
      {
        "fields": [
          {

          },
          {

          }
        ],
        "timestamp": 1592927412025950
      }
    ],
    "descriptionData": "IBCustomerInMDM"
  },
  "fields": {
    "startTimeMillis": [
      "2020-06-23T15:50:10.125Z"
    ]
  },
  "highlight": {
    "responseData": [
      "{gbgfh}"
    ],
    "operationName": [
          ]
  },
  "sort": [
    "2020-06-23T13:28:29.398827502Z"
  ]
}
  ]
}

When executing that, I'm getting:

{
  "error" : {
    "root_cause" : [
      {
        "type" : "class_cast_exception",
        "reason" : "class java.lang.Integer cannot be cast to class java.lang.Long (java.lang.Integer and java.lang.Long are in module java.base of loader 'bootstrap')"
      }
    ],
    "type" : "class_cast_exception",
    "reason" : "class java.lang.Integer cannot be cast to class java.lang.Long (java.lang.Integer and java.lang.Long are in module java.base of loader 'bootstrap')"
  },
  "status" : 500
}

So please check next time that the script you are sharing can be actually executed.

Anyway, I wrote this script which works:

PUT /_ingest/pipeline/foo
{
  "processors": [
    {
      "set": {
        "field": "pipeline",
        "value": "foo"
      }
    }
  ]
}
PUT /_ingest/pipeline/bar
{
  "processors": [
    {
      "set": {
        "field": "pipeline",
        "value": "bar"
      }
    }
  ]
}
POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "_description",
    "processors": [
      {
        "pipeline": {
          "if": "ctx.operationName == 'createRIBCase'",
          "name": "foo"
        }
      },
      {
        "pipeline": {
          "if": "ctx.operationName == 'searchRIBCustomerInMDM'",
          "name": "bar"
        }
      },
      {
        "fail": {
          "if": "ctx.operationName != 'createRIBCase' && ctx.operationName != 'searchRIBCustomerInMDM'",
          "message": "This pipeline requires operationName.name to be either `syslog` or `apache_httpd`"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "operationName": "searchRIBCustomerInMDM"
      }
    }
  ]
}

That gives:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "pipeline" : "bar",
          "operationName" : "searchRIBCustomerInMDM"
        },
        "_ingest" : {
          "timestamp" : "2020-06-26T08:05:21.480721Z"
        }
      }
    }
  ]
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.