Why a custom pipeline doesn't run automatically on a Crawler index?

The situation:

I created a Crawler index using a domain. After is created I added via script the field "indexedContentType":

PUT /search-testindex1/_mapping
{
  "properties": {
    "indexedContentType": {
      "type": "text",
      "fields": {
        "keyword": {
          "type": "keyword",
          "ignore_above": 256
        }
      }
    }
  }
}

I saw that the index was created with the ent-search-generic-ingestion pipeline as default, so I cloned a pipeline and named it with the same name as the index "search-testindex1"

Then I created a custom pipeline like this:

PUT _ingest/pipeline/search-testindex1@custom
{
  "processors": [
    {
      "set": {
        "field": "indexedContentType",
        "value": "Documents"
      }
    }
  ]
}

And in the "search-testindex1" pipeline I add a Pipeline processor to call the custom pipeline "search-testIndex1@custom", here is the processor script:

[
  {
    "attachment": {
      "description": "Extract text from binary attachments",
      "field": "_attachment",
      "target_field": "_extracted_attachment",
      "ignore_missing": true,
      "indexed_chars_field": "_attachment_indexed_chars",
      "if": "ctx?._extract_binary_content == true",
      "on_failure": [
        {
          "append": {
            "description": "Record error information",
            "field": "_ingestion_errors",
            "value": "Processor 'attachment' in pipeline '{{ _ingest.on_failure_pipeline }}' failed with message '{{ _ingest.on_failure_message }}'"
          }
        }
      ]
    }
  },
  {
    "pipeline": {
      "name": "search-testindex1@custom",
      "on_failure": [
        {
          "append": {
            "field": "_ingestion_errors",
            "value": [
              "Processor 'attachment' in pipeline '{{ _ingest.on_failure_pipeline }}' failed with message '{{ _ingest.on_failure_message }}'"
            ]
          }
        }
      ]
    }
  },
  {
    "set": {
      "tag": "set_body",
      "description": "Set any extracted text on the 'body' field",
      "field": "body",
      "copy_from": "_extracted_attachment.content",
      "ignore_empty_value": true,
      "if": "ctx?._extract_binary_content == true",
      "on_failure": [
        {
          "append": {
            "description": "Record error information",
            "field": "_ingestion_errors",
            "value": "Processor 'set' with tag 'set_body' in pipeline '{{ _ingest.on_failure_pipeline }}' failed with message '{{ _ingest.on_failure_message }}'"
          }
        }
      ]
    }
  },
  {
    "gsub": {
      "tag": "remove_replacement_chars",
      "description": "Remove unicode 'replacement' characters",
      "field": "body",
      "pattern": "�",
      "replacement": "",
      "ignore_missing": true,
      "if": "ctx?._extract_binary_content == true",
      "on_failure": [
        {
          "append": {
            "description": "Record error information",
            "field": "_ingestion_errors",
            "value": "Processor 'gsub' with tag 'remove_replacement_chars' in pipeline '{{ _ingest.on_failure_pipeline }}' failed with message '{{ _ingest.on_failure_message }}'"
          }
        }
      ]
    }
  },
  {
    "gsub": {
      "tag": "remove_extra_whitespace",
      "description": "Squish whitespace",
      "field": "body",
      "pattern": "\\s+",
      "replacement": " ",
      "ignore_missing": true,
      "if": "ctx?._reduce_whitespace == true",
      "on_failure": [
        {
          "append": {
            "description": "Record error information",
            "field": "_ingestion_errors",
            "value": "Processor 'gsub' with tag 'remove_extra_whitespace' in pipeline '{{ _ingest.on_failure_pipeline }}' failed with message '{{ _ingest.on_failure_message }}'"
          }
        }
      ]
    }
  },
  {
    "trim": {
      "description": "Trim leading and trailing whitespace",
      "field": "body",
      "ignore_missing": true,
      "if": "ctx?._reduce_whitespace == true",
      "on_failure": [
        {
          "append": {
            "description": "Record error information",
            "field": "_ingestion_errors",
            "value": "Processor 'trim' in pipeline '{{ _ingest.on_failure_pipeline }}' failed with message '{{ _ingest.on_failure_message }}'"
          }
        }
      ]
    }
  },
  {
    "remove": {
      "tag": "remove_meta_fields",
      "description": "Remove meta fields",
      "field": [
        "_attachment",
        "_attachment_indexed_chars",
        "_extracted_attachment",
        "_extract_binary_content",
        "_reduce_whitespace",
        "_run_ml_inference"
      ],
      "ignore_missing": true,
      "on_failure": [
        {
          "append": {
            "description": "Record error information",
            "field": "_ingestion_errors",
            "value": "Processor 'remove' with tag 'remove_meta_fields' in pipeline '{{ _ingest.on_failure_pipeline }}' failed with message '{{ _ingest.on_failure_message }}'"
          }
        }
      ]
    }
  }
]

The problem is that te custom pipeline doesn't run, I only could run it via console:
POST search-testindex1/_update_by_query?pipeline=search-testindex1@custom

What am I doing wrong? Please I'll be appreciated with your help.

1 Like

I was just working on this as well. There is no documented api to do this. by following the web requests I was able to noticed two undocumented steps which you need to do if you want to do this programmatically:

  1. Switch from using the default pipeline (ent-search-generic-ingestion)
  2. Update the connector to use your new pipeline

The first step you can simply post to the pipelines of your index:

(http://localhost:5601/internal/enterprise_search/indices/search-testindex1/pipelines)

with an empty body

The second is to add it to the connector:

http://localhost:5601/internal/enterprise_search/connectors/CW5QUI8BMtF7XE7KZ3HP/pipeline)

You can get your connector id by calling:

http://localhost:5601/internal/enterprise_search/indices/search-testindex1

that json response will have an element that you can get the connector id:

It should look something like:

{
    "count": 0,
    "aliases": [],
    "health": "yellow",
    "hidden": false,
    "name": "search-testindex1",
    "status": "open",
    "total": {
        "docs": {
            "count": 0,
            "deleted": 0
        },
        "store": {
            "size_in_bytes": "1.46kb"
        }
    },
    "uuid": "p6A_nswpQyC6Usk2IlRk2g",
    "has_in_progress_syncs": false,
    "has_pending_syncs": false,
    "connector": {
        "id": "Nw2SFo8BCzw0w0oNJqgn",
        "api_key_id": null,
        "api_key_secret_id": null,
        "configuration": {},

Good luck!

Thanks @michaelcizmar , can I do this from the Dev Tools? I couldn't apply your suggestion. I created a new index, that has the ent-search-generic-ingestion pipeline as default, add the custom pipeline, but no success yet :frowning:

I do not think so. I did it from an external script via rest calls. You are interacting with the Kibana endpoint.