How to invoke processors with BulkIngester with Java APIs

I'm trying to use the Java API's to invoke a processor while doing bulk indexing. I've set up the processor on my index to use the intfloat__multilingual-e5-base (see below) to map the value of the passage field to a passage_embedding target. My mapping has defined passage_embedding.predicted_values (see below). When running, I get the passage field, but I don't get any of the embeddings. I expect I need to relate the processor defined on my index to the bulkingester but can't figure out how to do this. I suspect I need to add an action to the bulkingester to run the processor...

Code for the pipeline:

        _client.ingest().putPipeline(pr -> pr
            .id("rdp_pipeline")
            .processors(p -> p
                .inference(i -> i
                    .modelId(
                        "intfloat__multilingual-e5-base")
                    .fieldMap("passage",
                        JsonData.of("text_field"))
                    .targetField("passage_embedding")
                    ))
        .processors(p -> p
            .inference(i -> i
                .modelId(
                    "intfloat__multilingual-e5-base")
                .fieldMap("path",
                    JsonData.of("text_field"))
                .targetField("path_embedding")
                )));

Code for the BulkIngester:

            BulkIngester<ObjectNode> ingester = BulkIngester.of(b -> b
                .client(_client)
                .maxOperations(100)
                .flushInterval(1L, TimeUnit.SECONDS));

Mappings Definition:

			"path": {
				"type": "text",
				"term_vector": "with_positions_offsets",
				"index_options": "offsets",
				"store": "true",
				"fields": {
					"type": "keyword",
					"ignore_above": 512
				}
			},
			"path_embedding.predicted_value": {
				"type": "dense_vector",
				"dims": 768,
				"similarity": "cosine",
				"index": "true",
				"store": "true"
			},

Welcome!

Multiple ways:

ingester.add(bo -> bo.index(io -> io.index(indexName).pipeline("rdp_pipeline").document(data)));
  • Set the pipeline globally on the bulk ingester:
BulkIngester<ObjectNode> ingester = BulkIngester.of(b -> b
  .client(_client)
  .globalSettings(
     gs -> gs.pipeline("rdp_pipeline")
  )
  .maxOperations(100)
  .flushInterval(1L, TimeUnit.SECONDS));

The best options are 1 and 3 IMO. Note that in option 3, you can also define the index name, so you don't have to do it when you create an operation and that's reducing the data going over the wire.

Thank you for the quick response. I'm not sure how I would have discovered this solution. I tried adding the .globalSettings (option 3), but I'm still not seeing the passage_embedding nor path_embedding when viewing the content of my index. I only see the fields I passed in my content object.

Is the pipeline processing postponed until after all the content is indexed or is there something else I need to invoke for the processors to be invoked?

I'm having trouble determining how to debug something like this (e.g., is there a way to turn on a trace to dump the REST requests to see the actual JSON being generated from the API calls?).

I'm using the Elastic GUI on port 5601 to view the Content / Indices. I have verified the processors are associated with the model. But when viewing the index's pipeline for the machine learning inference pipelines (view details) I can see the processors in the definition:

{
  "processors": [
    {
      "inference": {
        "model_id": "intfloat__multilingual-e5-base",
        "target_field": "passage_embedding",
        "field_map": {
          "passage": "text_field"
        }
      }
    },
    {
      "inference": {
        "model_id": "intfloat__multilingual-e5-base",
        "target_field": "path_embedding",
        "field_map": {
          "path": "text_field"
        }
      }
    }
  ]
}

but, the Ingest stats are all 0.

I am not sure how to add inferenceConfig results to the processor. Perhaps this is the problem -- that the generated predicted_value doesn't know where to post the result?

So, I've added my guess at an inferenceConfig as follows:

        _client.ingest().putPipeline(pr -> pr
            .id("rdp_pipeline")
            .processors(p -> p
                .inference(i -> i
                    .modelId(
                        "intfloat__multilingual-e5-base")
                    .fieldMap("passage",
                        JsonData.of("text_field"))
                    .targetField("passage_embedding")
                    .inferenceConfig(ic -> ic
                    .regression(icr -> icr
                        .resultsField("predicted_value")))))
            .processors(p -> p
                .inference(i -> i
                    .modelId(
                        "intfloat__multilingual-e5-base")
                    .fieldMap("path",
                        JsonData.of("text_field"))
                    .targetField("path_embedding")
                    .inferenceConfig(ic -> ic
                        .regression(icr -> icr
                            .resultsField("predicted_value")))))
            );

which translated to the following:

{
  "processors": [
    {
      "inference": {
        "model_id": "intfloat__multilingual-e5-base",
        "target_field": "passage_embedding",
        "field_map": {
          "passage": "text_field"
        },
        "inference_config": {
          "regression": {
            "results_field": "predicted_value"
          }
        }
      }
    },
    {
      "inference": {
        "model_id": "intfloat__multilingual-e5-base",
        "target_field": "path_embedding",
        "field_map": {
          "path": "text_field"
        },
        "inference_config": {
          "regression": {
            "results_field": "predicted_value"
          }
        }
      }
    }
  ]
}

and now I see stats but all of them are failing...

I"m not sure what should go with the intfloat__multilingual-e5-base but I believe we can only pick between regression and classification... I have no way to understand why it is failing (no exceptions thrown) and the add method is void so nothing returned...

So, added a BulkListener to get errors, started the model, but am now at this error:

elasticsearch-rest-client-0-thread-2", "message": "Failed to index file null - Trained model [intfloat__multilingual-e5-base] is configured for task [text_embedding] but called with task [regression]" }

The Java API doesn't appear to have text_embedding as an option in the inferenceConfig, only regression and classification. Is there a way to specify text_embedding via JSON?

I've devolved to trying to create an InferenceConfig from JSON but I don't know the format it wants to create one for text_embedding with a results_field set to predicted_value.

Trying with:

{
    "text_embedding": {
        "results_field": "predicted_value"
    }
}

it bitches about an unknown field 'text_embedding'. I also tried with:

{
    "inference_config": {
        "text_embedding": {
            "results_field": "tokens"
        }
    }
}

I've tried falling back to creating a processor but it still fails on the "text_embedding" field (which it seemed the model wanted several replies above).

{
	"inference": {
		"model_id": "intfloat__multilingual-e5-base",
		"field_map": {
			"passage": "text_field"
		},
		"target_field": "passage_embedding",
                "inference_config": {
	            "text_embedding": {
	                 "results_field": "predicted_value"
	             }
	     }
	}
}

I get this error:

Error deserializing co.elastic.clients.elasticsearch.ingest.InferenceConfig: Unknown field 'text_embedding' (JSON path: inference.inference_config.text_embedding) (line no=9, column no=23, offset=202)

Looking at this article, they don't even specify the inference config, but if I try without it, the processors get created but I still don't see any of the dense_values in the index objects created. No errors are produce though using:

{
	"inference": {
		"model_id": "intfloat__multilingual-e5-base",
		"field_map": {
			"passage": "text_field"
		},
		"target_field": "passage_embedding"
	}
}

I"m convinced there is a deficiency in the Java APIs that won't allow creation of an InferenceConfig with anything but regression or classification so I'm basically SOL.

Any help @dadoonet is greatly appreciated.

I fell back to using a REST request to set the processors for my index using the REST Client obtained from the Client via:
rdpPipeline contains JSON like:

{
  "processors": [
    {
      "set": {
        "field": "_id",
        "value": "{{id}}"
      }
    },
    {
      "inference": {
        "model_id": "intfloat__multilingual-e5-base",
        "target_field": "passage_embedding",
        "field_map": {
          "passage": "text_field"
        },
        "inference_config": {
          "text_embedding": {
            "results_field": "tokens"
          }
        }
      }
    },
    {
      "inference": {
        "model_id": "intfloat__multilingual-e5-base",
        "target_field": "path_embedding",
        "field_map": {
          "path": "text_field"
        },
        "inference_config": {
          "text_embedding": {
            "results_field": "tokens"
          }
        }
      }
    }
  ]
}

and then this code invokes adding the processors to the pipeline:

        Request request = new Request("PUT", "/_ingest/pipeline/rdp_pipeline");
        request.setJsonEntity(rdpPipeline);
        RestClientTransport restClientTransport = (RestClientTransport) client
            ._transport();
        Response response = restClientTransport.restClient()
            .performRequest(request);
        if (response.getStatusLine().getStatusCode() == 200) {
            ObjectMapper objectMapper = new ObjectMapper();
            String acknowledged = EntityUtils.toString(response.getEntity());
            AcknowledgedResponse ak_response = objectMapper
                .readValue(acknowledged, CreatePipelineResponse.class);
            if (!ak_response.acknowledged()) {
                logger.error("Creating pipeline returned false.");
            }
        } else {
            logger.error("Could not set rdp_pipeline due to request status "
                + response.getStatusLine().getStatusCode());
        }

With the CreatePipelineResponse class set up like:

public class CreatePipelineResponse implements AcknowledgedResponse {

    public boolean acknowledged = false;
    
    CreatePipelineResponse() {
        ;
    }
    
    CreatePipelineResponse(boolean acknowledged) {
        this.acknowledged = acknowledged;
    }
    
    @Override
    public boolean acknowledged() {
        return acknowledged;
    }

}

Remove the set processor in above as I believe it is an error.