Question about nested pipeline

I have question with using nested pipeline :wink:

  1. distribution pipeline processor:
    PUT _ingest/pipeline/pipeline_for_distribution
    {
    "description": "a pipeline for distribution",
    "processors": [
    {
    "pipeline": {
    "name": "pipeline_for_set",
    "if": """
    String tagss = ctx['tags'];
    if (tagss.toLowerCase().contains('openstack')) {
    return true;
    }
    return false;
    """
    }
    },
    {
    "pipeline": {
    "name": "pipeline_for_split",
    "if": """
    String tagsx = ctx['tags'];
    if (tagsx.toLowerCase().contains('hadoop')) {
    return true;
    }
    return false;
    """
    }
    }
    ]
    }
  2. Nested pipeline processor
    PUT _ingest/pipeline/pipeline_for_set
    {
    "description": "a pipeline for set",
    "processors": [
    {
    "pipeline": {
    "name": "pipeline_for_split"
    }
    }
    ]
    }
  3. Normal pipeline processor
    PUT _ingest/pipeline/pipeline_for_split
    {
    "description": "a pipeline for split",
    "processors": [
    {
    "split": {
    "field": "tags",
    "separator": ",",
    "target_field": "tags_arr" //if no target_field set, will be failed
    }
    }
    ]
    }
  4. Test
    POST _ingest/pipeline/_simulate
    {
    "pipeline": {
    "description": "to split blog tags",
    "processors": [
    {
    "pipeline": {
    "name": "pipeline_for_distribution"
    }
    }
    ]
    },
    "docs": [
    {
    "_index": "index",
    "_id": "idxx",
    "_source": {
    "title": "Introducing cloud computering",
    "tags": "openstack,k8s",
    "content": "You konw,for cloud"
    }
    }
    ]
    }
  5. Error
    {
    "docs" : [
    {
    "error" : {
    "root_cause" : [
    {
    "type" : "exception",
    "reason" : "java.lang.IllegalArgumentException: ScriptException[runtime error]; nested: ClassCastException[org.Elasticsearch.ingest.ConditionalProcessor$UnmodifiableIngestList cannot be cast to java.lang.String];",
    "header" : {
    "processor_type" : "conditional"
    }
    }
    ],
    "type" : "exception",
    "reason" : "java.lang.IllegalArgumentException: ScriptException[runtime error]; nested: ClassCastException[org.Elasticsearch.ingest.ConditionalProcessor$UnmodifiableIngestList cannot be cast to java.lang.String];",
    "caused_by" : {
    "type" : "illegal_argument_exception",
    "reason" : "ScriptException[runtime error]; nested: ClassCastException[org.Elasticsearch.ingest.ConditionalProcessor$UnmodifiableIngestList cannot be cast to java.lang.String];",
    "caused_by" : {
    "type" : "script_exception",
    "reason" : "runtime error",
    "script_stack" : [
    """
    tagsx = ctx['tags'];

""",
" ^---- HERE"
],
"script" : """
String tagsx = ctx['tags'];
if (tagsx.toLowerCase().contains('hadoop')) {
return true;
}
return false;
""",
"lang" : "painless",
"caused_by" : {
"type" : "class_cast_exception",
"reason" : "org.Elasticsearch.ingest.ConditionalProcessor$UnmodifiableIngestList cannot be cast to java.lang.String"
}
}
},
"header" : {
"processor_type" : "conditional"
}
}
}
]
}

Anyone can help me to explain this?

Hi @fengjian1993 Welcome to the community.

Unfortunately it will be really difficult for people to help because we can not read your code because it is not formatted.

Could you please edit your post... And format your code...

You can select / highlight it using the </> button on the formatting bar

Thanks then we might be able to help...

distribution pipeline processor:

PUT _ingest/pipeline/pipeline_for_distribution
{
    "description": "a pipeline for distribution",
    "processors": [
        {
            "pipeline": {
                "name": "pipeline_for_set",
                "if": """
                    String tagss = ctx['tags'];
                    if (tagss.toLowerCase().contains('openstack')) {
                        return true;
                    }
                    return false;
                """
            }
        },
        {
            "pipeline": {
                    "name": "pipeline_for_split",
                    "if": """
                        String tagsx = ctx['tags'];
                        if (tagsx.toLowerCase().contains('hadoop')) {
                            return true;
                        }
                        return false;
                    """
            }
        }
    ]
}

Nested pipeline processor:

PUT _ingest/pipeline/pipeline_for_set
{
    "description": "a pipeline for set",
    "processors": [
        {
            "pipeline": {
                "name": "pipeline_for_split"
            }
        }
    ]
}

Normal pipeline processor:

PUT _ingest/pipeline/pipeline_for_split
{
    "description": "a pipeline for split",
    "processors": [
        {
            "split": {
                "field": "tags",
                "separator": ",",
                "target_field": "tags_arr" //if no target_field set, will be failed
            }
        }
    ]
}

Test

POST _ingest/pipeline/_simulate
{
    "pipeline": {
        "description": "to split blog tags",
        "processors": [
            {
                "pipeline": {
                    "name": "pipeline_for_distribution"
                }
            }
        ]
    },
    "docs": [
        {
            "_index": "index",
            "_id": "idxx",
            "_source": {
                "title": "Introducing cloud computering",
                "tags": "openstack,k8s",
                "content": "You konw,for cloud"
            }
        }
    ]
}

Error

{
  "docs" : [
    {
      "error" : {
        "root_cause" : [
          {
            "type" : "exception",
            "reason" : "java.lang.IllegalArgumentException: ScriptException[runtime error]; nested: ClassCastException[org.elasticsearch.ingest.ConditionalProcessor$UnmodifiableIngestList cannot be cast to java.lang.String];",
            "header" : {
              "processor_type" : "conditional"
            }
          }
        ],
        "type" : "exception",
        "reason" : "java.lang.IllegalArgumentException: ScriptException[runtime error]; nested: ClassCastException[org.elasticsearch.ingest.ConditionalProcessor$UnmodifiableIngestList cannot be cast to java.lang.String];",
        "caused_by" : {
          "type" : "illegal_argument_exception",
          "reason" : "ScriptException[runtime error]; nested: ClassCastException[org.elasticsearch.ingest.ConditionalProcessor$UnmodifiableIngestList cannot be cast to java.lang.String];",
          "caused_by" : {
            "type" : "script_exception",
            "reason" : "runtime error",
            "script_stack" : [
              """
tagsx = ctx['tags'];
                        
""",
              "           ^---- HERE"
            ],
            "script" : """
                        String tagsx = ctx['tags'];
                        if (tagsx.toLowerCase().contains('hadoop')) {
                            return true;
                        }
                        return false;
""",
            "lang" : "painless",
            "caused_by" : {
              "type" : "class_cast_exception",
              "reason" : "org.elasticsearch.ingest.ConditionalProcessor$UnmodifiableIngestList cannot be cast to java.lang.String"
            }
          }
        },
        "header" : {
          "processor_type" : "conditional"
        }
      }
    }
  ]
}

Thanks for the reply, formatted issue has been posted above

@fengjian1993 Notice the pipeline is splitting tags on ,:

PUT _ingest/pipeline/pipeline_for_split
{
    "description": "a pipeline for split",
    "processors": [
        {
            "split": {
                "field": "tags",
                "separator": ",",
                "target_field": "tags_arr" //if no target_field set, will be failed
            }
        }
    ]
}

And the script is getting an error at

String tagsx = ctx['tags'];
UnmodifiableIngestList cannot be cast to java.lang.String];

Which is due to ctx['tags'] being a List of String after the split processor above.

Try moving if processor before the split processor or, in the if processor, iterate over the list of tags from ctx['tags'] after it has been split.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.