Multiple Pipelines with condition

Hi Team,

I have been trying to add a condition on my multi processor pipeline.
{
"4modelprocessor_peopleagg": {
"processors": [
{
"pipeline": {
"name": "ner_pipeline_peopleagg"
}
},
{
"pipeline": {
"name": "elser_pipeline_peopleagg"
}
}
]
}
}

I should run the above elser or ner pipelines only when resumecontent or aboutme fields are available.
Is there a way we can specify a condition to run the pipeline if one of the fields are available.

Can someone please help me with the reference on this.

Yes, you can conditionally run a processor, please check the documentation.

Thanks for that. But I'm working on the inference processor. As per the documentation I can see it for set and drop processors. How can add my condition for the below inference processor :
{
"ner_pipeline": {
"processors": [
{
"inference": {
"model_id": "dslim__bert-base-ner",
"target_field": "ml.ner_resume_content",
"field_map": {
"resumecontent": "text_field",
"ignore_missing": "true"
}
}
},
{
"inference": {
"model_id": "dslim__bert-base-ner",
"target_field": "ml.ner_about_me",
"field_map": {
"aboutme": "text_field",
"ignore_missing": "true"
}
}
},
{
"script": {
"lang": "painless",
"if": "return ctx['ml']['ner_resume_content'].containsKey('entities') && ctx['ml']['ner_about_me'].containsKey('entities')",
"source": "Map resumeContentTags = new HashMap(); for (item in ctx['ml']['ner_resume_content']['entities']) { if (!resumeContentTags.containsKey(item.class_name)) resumeContentTags[item.class_name] = new HashSet(); resumeContentTags[item.class_name].add(item.entity); } ctx['resume_content_tags'] = resumeContentTags; Map aboutMeTags = new HashMap(); for (item in ctx['ml']['ner_about_me']['entities']) { if (!aboutMeTags.containsKey(item.class_name)) aboutMeTags[item.class_name] = new HashSet(); aboutMeTags[item.class_name].add(item.entity); } ctx['about_me_tags'] = aboutMeTags;"
}
}
],
"on_failure": [
{
"set": {
"description": "Index document to 'failed-'",
"field": "_index",
"value": "failed-{{{ _index }}}"
}
},
{
"set": {
"description": "Set error message",
"field": "ingest.failure",
"value": "{{_ingest.on_failure_message}}"
}
}
]
}
}

I should run the above elser or ner pipelines only when resumecontent or aboutme fields are available.
Is there a way we can specify a condition to run the pipeline if one of the fields are available. Can you please help me with how the code looks like with condition?

The documentation explains how to conditionally run a processor and as explained it applies to every processor.

Each processor supports an optional if condition, written as a Painless script. If provided, the processor only runs when the if condition is true .

If you check the inference processor documentation you will see that the if is also an option and has a link to the documentation.

You need to follow the examples in the documentation, there are some that validates if a field exists or not.

It will be something like this:

"if": "ctx.resumecontent != null || ctx.aboutme != null"

Thanks for the above condition. I have tried to add my condition like this with "ignore_missing" : true and also an if condition like below :
{
"processors": [
{
"inference": {
"field_map": {
"resumecontent": "text_field"
//"ignore_missing": "true"
},
"if": "ctx.resumecontent != null || ctx.aboutme != null",
"model_id": "dslim__bert-base-ner",
"target_field": "ml.ner_resume_content"

        }
    },
    {
        "inference": {
            //"if": "ctx.resumecontent != null || ctx.aboutme != null",
            "field_map": {
                "aboutme": "text_field"
                //"ignore_missing": "true"
            },
            "if": "ctx.resumecontent != null || ctx.aboutme != null",
            "model_id": "dslim__bert-base-ner",
            "target_field": "ml.ner_about_me"
        }
    },
    {
        "script": {
            "lang": "painless",
            "if": "return ctx['ml']['ner_resume_content'].containsKey('entities') && ctx['ml']['ner_about_me'].containsKey('entities')",
            "source": "Map resumeContentTags = new HashMap(); for (item in ctx['ml']['ner_resume_content']['entities']) { if (!resumeContentTags.containsKey(item.class_name)) resumeContentTags[item.class_name] = new HashSet(); resumeContentTags[item.class_name].add(item.entity); } ctx['resume_content_tags'] = resumeContentTags; Map aboutMeTags = new HashMap(); for (item in ctx['ml']['ner_about_me']['entities']) { if (!aboutMeTags.containsKey(item.class_name)) aboutMeTags[item.class_name] = new HashSet(); aboutMeTags[item.class_name].add(item.entity); } ctx['about_me_tags'] = aboutMeTags;"
        }
    }
],
"on_failure": [
    {
        "set": {
            "description": "Index document to 'failed-<index>'",
            "field": "_index",
            "value": "failed-{{{ _index }}}"
        }
    },
    {
        "set": {
            "description": "Set error message",
            "field": "ingest.failure",
            "value": "{{_ingest.on_failure_message}}"
        }
    }
]

}
I have been trying to run the above pipeline with reindexing which has only aboutme field and I don't see anything that is being created on the final index.

Can you please let me know if I'm missing on something?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.