Injest Pipeline for conditional statement used in Scripts

Hii,

My goals -

  1. Find the existing field if present (responseBody.results[i].statusCode) and copy it's value into a new field (embedded_error_code ) at runtime
  2. My original field is a list of array. Below is the structure of my original field - (responseBody.results[i].statusCode)
 "responseBody": {
      "results": [
        {
          "hasErrors": false,
          "statusCode": 201,
          "uniqueId": "***",
          "messageId": null,
          "customerId": "***",
          "developerMessage": "SUCCESS",
          "userMessage": "SUCCESS",
          "errors": []
        }
      ]
    },

My working Script when used with _update_by_query

POST _scripts/test-embedded-script
{
  "script": {
    "lang" : "painless",
     "source": """
        if(ctx._source.responseBody != null )
        {
            for (int i = 0; i < ctx._source.responseBody.results.length; ++i) 
            {
              ctx._source.embedded_error_code = ctx._source.responseBody.results[i].statusCode;
            } 
        }"""
  }
}

My defined pipeline -

PUT _ingest/pipeline/apiproxy-embedded
{
  "processors": [
    {
      "script": {
        "id": "test-embedded-script"
      }
    }
  ]
}

I want to run the above script runtime, that's why I'm using Injest Pipeline
test-embedded-script isn't working in apiproxy-embedded pipeline.
Please help me to write this script as one of the processors of my pipeline. :pray:

1 Like

Ingest pipline works literally while ingesting. If you want to run the script at query runtime, use runtime mappings in the query. "Define runtime fields in a search request" may help you.

1 Like

Thank you @Tomo_M, I'll try this and will update.
Just a quick question about invest pipelines, why I shouldn't use invest over runtime fields?

@Tomo_M I tried this and as per my understanding this only works if my existing field is mapped or has to be mapped? As per my need I don't want to map my existing fields (responseBody.results[i].statusCode) but new field embedded_error_code.

Because Injest pipeline worked in my case that gives me new single field without mapping my existing field which might be a json array, I inclined towards it. Any other solution please help :pray:

What did you mean by "at runtime"?

In most cases, retrieve field values through doc_values whenever possible. Accessing doc_values with a runtime field is faster than retrieving values from _source because of how data is loaded from Lucene.
However, there are cases where retrieving fields from _source is necessary. For example, text fields do not have doc_values available by default, so you have to retrieve values from _source. In other instances, you might choose to disable doc_values on a specific field.
Map a runtime field | Elasticsearch Guide [8.0] | Elastic

Accessing _source in runtime field is also possible.

Hi @Tomo_M,
at runtime - I mean doing this all stuff while data takes defined mapping or default pipeline configuration from Index Template.

Now,
Why runtime field didn't work for me - this field (responseBody.results[i].statusCode) has different data type in different document for eg. my logs are coming from different apiProxies so that's why I get different responseBody and sometimes it's a keyword and sometimes it's an object.

Injest pipeline worked for my other cases where I didn't have list in my responseBody but not here - (responseBody.results[i].statusCode) because result is a list.

As you suggested before, I defined this mapping -

"mappings": {
      "runtime": {
        "runtime_field_24": {
          "type": "keyword",
          "script": {
            "source": "emit(doc['responseBody']['results']['statusCode'].value)"
          }
        }
      }
}

But I got this error -
image

Let me know if did any mistake defining this mapping or can we do this via Injest pipeline?

It is not an elastic meaning of runtime. I suppose it is better to avoid using another meaning of runtime to avoid confusion.

Please share the error by text, it's difficult to read screenshot. It is not seachable eighter.

That said, the error caused by the 'responseBody' field type is not consistent with mappings. The error will occure even without the runtime mapping.

Error above in screenshot-
Object mapping for [responseBody] tried to parse field [responseBodt] as object, but foound a concrete value

That said, the error caused by the ' responseBody ' field type is not consistent with mappings. The error will occure even without the runtime mapping.

To answer above statement -

That's why I'm not indexing this field (responseBody) in my index pattern so that I don't get conflicted field issue (occurs when same field has different data type in the single index pattern)

Your pipeline just add new fields to the _source.

Use remove processor to remove the other fields, or just refresh ctx._source in your script.

Hi @Tomo_M ,
Are you suggesting me to write a 1. script like this and then define a 2. processor in my pipeline -

POST _scripts/test-embedded-script
{
  "script": {
    "lang" : "painless",
     "source": """
        if(ctx._source.responseBody != null )
        {
            for (int i = 0; i < ctx._source.responseBody.results.length; ++i) 
            {
              ctx._source.embedded_error_code = ctx._source.responseBody.results[i].statusCode;
            } 
        }"""
  }
}
PUT _ingest/pipeline/apiproxy-embedded
{
  "processors": [
    {
      "script": {
        "id": "test-embedded-script"
      }
    }
  ]
}

I didn't understand -

Your pipeline just add new fields to the _source.

It would be really helpful if you could just point out to exact line I have change or modify in my above script/ pipeline.

Maybe you do not turn off dynamic mappings and the field type mapping for responseBody was created at the first document indexed. Then error occured when documents containing other type responseBody were to be indexed.

Than meant your processor does not remove the original fields.

Try:

POST _scripts/test-embedded-script
{
  "script": {
    "lang" : "painless",
     "source": """
        if(ctx._source.responseBody != null )
        {
            for (int i = 0; i < ctx._source.responseBody.results.length; ++i) 
            {
              ctx._source.embedded_error_code = ctx._source.responseBody.results[i].statusCode;
            } 
        }"""
  }
}

PUT _ingest/pipeline/apiproxy-embedded
{
  "processors": [
    {
      "script": {
        "id": "test-embedded-script"
      }
    },
    {
      "remove": {
        "field": ["responseBody "]
      }
    }
  ]
}

Hi @Tomo_M ,
Even after trying above script and pipeline processor as you mentioned I'm getting this error while I'm reindexing my data for testing-

  "failures" : [
    {
      "index" : "test-apiproxy-2022.02.02",
      "type" : "_doc",
      "id" : "EaXcvH4B1BqwuQQQ8qv3",
      "cause" : {
        "type" : "script_exception",
        "reason" : "runtime error",
        "script_stack" : [
          "if(ctx._source.responseBody != null )\r\n        {\r\n            ",
          "              ^---- HERE"
        ],
        "script" : "embeddedcode-script",
        "lang" : "painless",
        "position" : {
          "offset" : 24,
          "start" : 10,
          "end" : 72
        },
        "caused_by" : {
          "type" : "null_pointer_exception",
          "reason" : "cannot access method/field [responseBody] from a null def reference"
        }
      },
      "status" : 400
    },
]

I don't understand the null pointer error even after checking this condition in my script

if(ctx._source.responseBody != null )

Check if ctx._source.containsKey("responseBody").

Getting this error -

    {
      "index" : "test-apiproxy-2022.02.02",
      "type" : "_doc",
      "id" : "EaXcvH4B1BqwuQQQ8qv3",
      "cause" : {
        "type" : "script_exception",
        "reason" : "runtime error",
        "script_stack" : [
          "if(ctx._source.containsKey(\"responseBody\"))\r\n        {\r\n            ",
          "              ^---- HERE"
        ],
        "script" : "embeddedcode-script",
        "lang" : "painless",
        "position" : {
          "offset" : 24,
          "start" : 10,
          "end" : 78
        },
        "caused_by" : {
          "type" : "null_pointer_exception",
          "reason" : "cannot access method/field [containsKey] from a null def reference"
        }
      },
      "status" : 400
    },

Oh sorry, You have to use ctx['my-field'] or ctx.<my-field> syntax in ingest pipeline script processor.

Tested with this script although this time I didn't get any error but I didn't get result in embedded_error_code either. Is there any mistakes in below script ?
Note : I defined this script directly in Injest pipeline processor list

    {
      "script": {
        "description": "testing responseBody script again",
        "lang": "painless",
        "source": """
        if( ctx.containsKey(['responseBody']))
        {
            for (int i = 0; i < ctx['responseBody']['results'].value.size(); ++i) 
            {
              ctx['embedded_error_code'] = ctx['responseBody']['results'][i]['statusCode'];
            } 
        }"""
      }
    }

should be if( ctx.containsKey('responseBody'))
But I'm not sure if there is containsKey method in ctx.

Using this script, not getting null pointer exception while reindexing but still not getting values ['responseBody']['results'][i]['statusCode']of in [embedded_error_code]

POST _scripts/embeddedcode_script
{
  "script": {
    "description": "testing responseBody script again",
    "lang": "painless",
    "source": """
        if( ctx['responseBody'] != null && ctx.containsKey('results'))
        {
            for (int i = 0; i < ctx['responseBody']['results'].value.size(); ++i) 
            {
              ctx['embedded_error_code'] = ctx['responseBody']['results'][i]['statusCode'];
            } 
        }"""
  }
}

I'm understanding a mistake here why I'm not getting values in embedded_error_code

When I tested a document in pipeline for testing pipeline, Script got executed but I guess this line is not working here in Script as it worked for _update_query -

 ctx['embedded_error_code'] = ctx['responseBody']['results'][i]['statusCode'];

Any idea how I can assign existing field's value into another field inside Injested pipeline Script?

is the way to set a value on that field. But your script overwrite the value in each iteration.

I'm not sure what you want to do, it is possible to create an array and add values in each iteration, then finally set the array to ctx['embedded_error_code'].