Dec 2nd, 2017: [JP]Elasticsearch] Elasticsearch Ingest pluginの作り方

Ingest Nodeの機能は5で追加されました。Elasticsearchにデータを送信し、インデックスに登録する前処理として、色々な処理が可能です。例えば、Grok pluginを利用すれば、正規表現を利用して文字列を構造化したりできます。また、GeoIP pluginを利用すれば、IPアドレスを元に、緯度経度などの地理情報を追加するといったことも前処理で可能です。

独自のIngest Node pluginを作ってみる

Elasticsearchはまた、色々な機能をプラグインとして追加可能です。
そんなプラグインを独自に作成することもできます。
ということで、Ingest pluginを簡単につくる方法を今日は紹介します。

前提条件

ローカルにcookiecutterコマンドがインストールされていること。
cookiecutterのインストールについてはこちらをご覧ください。

cookiecutterはプロジェクトテンプレート

以下を実行すれば、あとは対話形式で決めていくだけになります。
今回は、サンプルとして、helloというフィールドにworld!という値を追加するようなサンプルのIngest Processorを作成してみます。

cookiecutter gh:spinscale/cookiecutter-elasticsearch-ingest-processor

決める内容としては、以下の4つです。

  • processor_type
    • Processorの名前
  • description
    • このProcessorが何を行うかという説明
  • developer_name
    • あなたのお名前
  • elasticsearch_version
    • 対象とするElasticsearchのバージョン

サンプルの名前を"helloworld"としてみます。

processor_type [awesome]: helloworld
package_dir [helloworld]: 
description [Ingest processor that is doing something awesome]: Ingest processor that add "hello" field               
developer_name [YOUR REAL NAME]: Jun Ohtani
elasticsearch_version [6.0.0]: 

processor_typeは全て小文字のアルファベットでなければいけないという制約があります。
上記を入力し終われば、プロジェクトのディレクトリが出来上がります。

実際に作成されたプロジェクトの構成は次のようになります。

ディレクトリ構成はこちら
ingest-helloworld/
├── LICENSE.txt
├── NOTICE.txt
├── README.md
├── build.gradle
├── settings.gradle
└── src
    ├── main
    │   ├── java
    │   │   └── org
    │   │       └── elasticsearch
    │   │           └── plugin
    │   │               └── ingest
    │   │                   └── helloworld
    │   │                       ├── HelloworldProcessor.java
    │   │                       └── IngestHelloworldPlugin.java
    │   └── plugin-metadata
    │       └── plugin-security.policy
    └── test
        ├── java
        │   └── org
        │       └── elasticsearch
        │           └── plugin
        │               └── ingest
        │                   └── helloworld
        │                       ├── HelloworldProcessorTests.java
        │                       └── HelloworldRestIT.java
        └── resources
            └── rest-api-spec
                └── test
                    └── ingest-helloworld
                        ├── 10_basic.yml
                        └── 20_helloworld_processor.yml

ご覧のようにGradleの設定ファイルなども用意されます。
src/mainの下にJavaのクラスが、src/testの下にテストクラスの雛形が出来上がります。

Helloworld Processor Pluginの実装

HelloworldProcessor.javaが、実際にIngestで処理したいものを記述します。
今回のサンプルでは、Ingest Processorの設定として、以下のものを用意します。

  • target_field
    • 追加するフィールド名。デフォルトはhello
  • target_value
    • target_fieldに入れる値。デフォルトはworld!

Processorの設定はFactoryクラスのcreateメソッドで読み込んで、Processorに設定します。
target_fieldtarget_valueを設定読み込みます。この時、デフォルト値としてそれぞれhelloworld!を指定するようにしておきます(targetFieldtargetValueというプロパティもHelloworldProcessorクラスに追加してあります。また、plugin-security.policyというファイルができていますが、今回は特に必要ないので、削除してあります)。

    public static final class Factory implements Processor.Factory {

        @Override
        public HelloworldProcessor create(Map<String, Processor.Factory> factories, String tag, Map<String, Object> config) 
            throws Exception {
            String targetField = readStringProperty(TYPE, tag, config, "target_field", "hello");
            String targetValue = readStringProperty(TYPE, tag, config, "target_value", "world!");

            return new HelloworldProcessor(tag, targetField, targetValue);
        }
    }

次に、実際にDocumentに対して行われる前処理を実装します。
これは、executeメソッドになります。引数のingestDocumentが前処理対象のドキュメントです。

    @Override
    public void execute(IngestDocument ingestDocument) throws Exception {
        ingestDocument.setFieldValue(targetField, targetValue);
    }

その他に、Pluginクラスというのもあります。こちらは、elasticsearch.ymlから設定を読み込む場合にこちらのクラスで設定します。今回のサンプルでは利用しません。
さらにテストなどを書けば立派なプラグインの完成です。

実際にElasticsearchにinstallして動作確認

では、実際にElasticsearchに追加できるプラグインのZipを作ってみましょう。
今回利用したプロジェクトテンプレートはGradleでビルドできるようにGradleのファイルも用意してくれています。
ですので、以下のコマンドを実行すれば、テストの実行も行われ、zipファイルまで生成されます。

gradle clean check

ビルドが成功すると、build/distributionsというディレクトリが作成され、
ingest-helloworld-0.0.1-SNAPSHOT.zipが出来上がります。
あとは、Elasticsearchにインストールです。

bin/elasticsearch-plugin install file:///path/to/ingest-helloworld/build/distribution/ingest-helloworld-0.0.1-SNAPSHOT.zip

インストールできたら、Elasticsearchを起動して、Ingest APIのSimulate Pipeline APIで試しみましょう。

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "sample",
    "processors": [
      {
        "helloworld" : {
        }
      }
    ]
  },
  "docs": [
    {"_source": {}}
  ]
}

するとhelloというフィールドにworld!という値が追加されているのが、レスポンスからわかります。

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_type": "_type",
        "_id": "_id",
        "_source": {
          "hello": "world!"
        },
        "_ingest": {
          "timestamp": "2017-11-28T08:55:25.628Z"
        }
      }
    }
  ]
}

まとめ

簡単ですが、Cookiecutterを利用したIngest Node pluginの作り方を見てきました。

実際に、この方法で、ingest-csv pluginを作成しています。テストの記述などについて詳しく知りたい方は参考にしていただければと思います。
また、テンプレート自体は https://github.com/spinscale/cookiecutter-elasticsearch-ingest-processor にあります。

ぜひ、Ingest Nodeのプラグインで遊んでみてください!

2 Likes