Ingest Processor Plugin with Analyzer Support

We are working on building an ingest processor plugin to support the use of analyzer for tokenization. This can be used to create word clouds from text.

How does one access an Analyzer to perform this task of tokenizing a text? Here is what we have so far.
Utilizing ideas and works from:

  • https://github.com/elastic/elasticsearch/pull/20215/files

  • https://github.com/elastic/elasticsearch/pull/20233/files

  • https://github.com/spinscale/elasticsearch-ingest-opennlp

    /*

    • Copyright [2017] [Daniel Schimpfoessl]
    • Licensed under the Apache License, Version 2.0 (the "License");
    • you may not use this file except in compliance with the License.
    • You may obtain a copy of the License at
    • http://www.apache.org/licenses/LICENSE-2.0
      
    • Unless required by applicable law or agreed to in writing, software
    • distributed under the License is distributed on an "AS IS" BASIS,
    • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    • See the License for the specific language governing permissions and
    • limitations under the License.

    */

    package com.purefield.elasticsearch.plugin.ingest.text.tokenizer;

    import org.apache.lucene.analysis.Analyzer;
    import org.apache.lucene.analysis.TokenStream;
    import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
    import org.elasticsearch.ElasticsearchException;
    import org.elasticsearch.common.Strings;
    import org.elasticsearch.ingest.AbstractProcessor;
    import org.elasticsearch.ingest.IngestDocument;
    import org.elasticsearch.ingest.Processor;

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;

    import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;

    public class TextTokenizerProcessor extends AbstractProcessor {

      public static final String TYPE = "text_tokenizer";
      private final String sourceField;
      private final String targetField;
      private final String analyzerName;
      private Processor.Parameters parameters;
      private Analyzer analyzer;
    
      public TextTokenizerProcessor(Processor.Parameters parameters, String tag, String sourceField, String targetField,
                                    String analyzerName) throws IOException {
          super(tag);
          this.parameters   = parameters;
          this.sourceField  = sourceField;
          this.targetField  = targetField;
          this.analyzerName = analyzerName;
          try {
              this.analyzer = parameters.analysisRegistry.getAnalyzer(analyzerName);
          }
          catch (IOException e){
              throw new IllegalArgumentException("Analyzer ["+ analyzerName + "] is not valid.", e);
          }
      }
    
      @Override
      public void execute(IngestDocument ingestDocument) throws IllegalArgumentException {
          Object text = ingestDocument.getFieldValue(sourceField, String.class);
          String token = "";
    
              List<String> tokens = new ArrayList<>();
              if (text instanceof String){
                  tokenize(tokens, (String) text);
              }
              else {
                  throw new IllegalArgumentException("Field ["+ sourceField + "] is not a String.");
              }
              ingestDocument.setFieldValue(targetField, tokens);
    
      }
    
      private void tokenize(List<String> tokens, String text) throws ElasticsearchException {
          if (Strings.hasLength(text)) {
              try (TokenStream tokenStream = analyzer.tokenStream(sourceField, text)) {
                  CharTermAttribute term = tokenStream.addAttribute(CharTermAttribute.class);
                  tokenStream.reset();
                  while (tokenStream.incrementToken()) {
                      tokens.add(term.toString());
                  }
                  tokenStream.end();
              }
              catch (IOException e){
                  throw new ElasticsearchException("Failed to tokenize source field [" + sourceField + "]", e);
              }
          }
      }
    
      @Override
      public String getType() {
          return TYPE;
      }
    
      public static final class Factory implements Processor.Factory {
          private Processor.Parameters parameters;
    
          Factory (Processor.Parameters parameters) { this.parameters = parameters; }
    
          @Override
          public TextTokenizerProcessor create(Map<String, Processor.Factory> factories, String tag, Map<String, Object> config) 
              throws Exception {
              String analyzerName = readStringProperty(TYPE, tag, config, "analyzer");
              String sourceField  = readStringProperty(TYPE, tag, config, "source_field");
              String targetField  = readStringProperty(TYPE, tag, config, "target_field", sourceField + "_keywords");
    
              return new TextTokenizerProcessor(parameters, tag, sourceField, targetField, analyzerName);
          }
      }
    

    }

I think the code looks good. Is there something that is not working?

If you want to create a word cloud, it is possible that an analyzer could
emit tokens that are not legible if you use "destructive" filters such as
stemming and ngrams. On the generic code side, since order does not matter
in a word cloud, I would return a set or a counting map instead of a list
(and return a value, not mutate a parameter as a side effect).

1 Like

The code works. The concern with the code is the need to access an analyzer. The only way that seems possible is to use the parameters passed in. While this works for the execution it does not work for the test class as we would have to mock the parameters to instantiate TextTokenizerProcessor. How can we test the analyzer properly?

Creating a new pipeline with the processor using a defined custom analyzer fails:

curl -sH 'content-type: application/json' -X PUT http://$es_node:9200/_ingest/pipeline/text-tokenizer-pipeline -d '{
>   "description": "A pipeline to tokenize text into array",
>   "processors": [
>     {
>       "text_tokenizer": {
>         "source_field": "text",
>         "target_field": "word_cloud",
>         "analyzer":     "custom_standard_lowercase_analyzer",
>         "on_failure" : [
>           {
>             "set" : {
>               "field" : "error",
>               "value" : "field \"text\" does not exist, cannot create \"word_cloud\""
>             }
>           }
>         ]
>       }
>     },
>     {
>       "text_tokenizer": {
>         "source_field": "post1",
>         "target_field": "words1",
>         "analyzer":     "stop"
>       }
>     },
>     {
>       "text_tokenizer": {
>         "source_field": "post2",
>         "target_field": "words2",
>         "analyzer":     "simple"
>       }
>     },
>     {
>       "text_tokenizer": {
>         "source_field": "post3",
>         "target_field": "words3",
>         "analyzer":     "standard"
>       }
>     }
>   ]
> }' | jq
{
  "error": {
    "root_cause": [
      {
        "type": "exception",
        "reason": "java.lang.IllegalArgumentException: Analyzer [custom_standard_lowercase_analyzer] not found.",
        "header": {
          "processor_type": "text_tokenizer"
        }
      }
    ],
    "type": "exception",
    "reason": "java.lang.IllegalArgumentException: Analyzer [custom_standard_lowercase_analyzer] not found.",
    "caused_by": {
      "type": "illegal_argument_exception",
      "reason": "Analyzer [custom_standard_lowercase_analyzer] not found."
    },
    "header": {
      "processor_type": "text_tokenizer"
    }
  },
  "status": 500
}

We solved the problem by defining the custom analyzer functionality directly using Lucene Classes.

Happy to share some code but unable to upload or include.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.