ETL処理の実行順序について

大変初歩的な質問で申し訳ないですが、公式リファレンスや書籍では理解できなかったため、ご質問いたします。

質問内容としては、各プラグインはどのような順番で実行されているかということです。

例えば、下記の例です。

input {
  jdbc {
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    statement => "SELECT * from example_table"
  }
}
filter {
  fingerprint {
    source => "id"
    target => "[@metadata][fingerprint]"
    method => "MURMUR3"
  }
}
output {
        elasticsearch {
        hosts => ["localhost:9200"]
        document_id => "%{[@metadata][fingerprint]}"
        index => "20220517_example_tb"
  }
}

上記のconfを使用し、Logstashを実行する。

取得元のDBの「example_table」テーブルには2件のデータが存在するものとする。

<想定1> ※データ1件ずつ各プラグインを通過させる
1件目のデータをinput pluginで取得

1件目のデータをfilter pluginで処理(下記の場合idフィールドからハッシュ値を生成する処理)

1件目のデータをoutput pluginでElasticsearchに投入

2件目のデータをinput pluginで取得

2件目のデータをfilter pluginで処理(下記の場合idフィールドからハッシュ値を生成する処理)

2件目のデータをoutput pluginでElasticsearchに投入

完了

<想定2> ※全データの各プラグインでの処理が完了次第、次のプラグインの実行フェーズに移行する
1件目、2件目のデータをinput pluginで取得

1件目、2件目のデータをfilter pluginで処理(下記の場合idフィールドからハッシュ値を生成する処理)

1件目、2件目のデータをoutput pluginでElasticsearchに投入

完了

<ご質問>

Logstashは、想定1と想定2、どちらの動作を行っているのでしょうか。
フロー図などを見る限り<想定2>の動きをしていると推測しておりますが、確信が持てません。

以上です。よろしくお願いいたします。

はっきりと言及されている箇所は見つけられませんでしたが、恐らくその中間で、

"SELECT * from example_table"の結果をJDBC input pluginが取得し、得られた結果を1行ごとに1つの「イベント」として(内部的な)キューに格納

1件目のイベントをJDBC input pluginがfilter pluginに渡す

1件目のイベントをfilter pluginで処理(下記の場合idフィールドからハッシュ値を生成する処理)

1件目のイベントをoutput pluginでElasticsearchに投入

2件目のイベントをinput pluginがfilter pluginに渡す

2件目のイベントをfilter pluginで処理(下記の場合idフィールドからハッシュ値を生成する処理)

2件目のイベントをoutput pluginでElasticsearchに投入

といった流れと理解しています。

aggregate filter pluginを用いることで、 <想定2>のような処理を行うこともできると思います。

Elasticsearchにindexするとエラーが出るようなデータを途中に入れておいて結果を確認すると、動きが把握できるかと思います。もし試されたら結果を教えて頂けると嬉しいです。

Tomo_Mさま

回答ありがとうございます。
ご返信が遅くなり、申し訳ございません。

Elasticsearchにindexするとエラーが出るようなデータを途中に入れておいて結果を確認すると、動きが把握できるかと思います。もし試されたら結果を教えて頂けると嬉しいです。

こちら試してみました。

DBのデータ

<テーブル1>→birthdayカラムのデータ型定義は date型

select * from sample_tb_1;
+-----------+-------+------------+
| name      | memo  | birthday   |
+-----------+-------+------------+
| testname1 | test1 | 2016-01-01 |
| testname2 | test2 | 2016-01-01 |
+-----------+-------+------------+

show columns from sample_tb_1;
+----------+-------------+------+-----+---------+-------+
| Field    | Type        | Null | Key | Default | Extra |
+----------+-------------+------+-----+---------+-------+
| name     | varchar(20) | YES  |     | NULL    |       |
| memo     | varchar(20) | YES  |     | NULL    |       |
| birthday | date        | YES  |     | NULL    |       |
+----------+-------------+------+-----+---------+-------+

<テーブル2>→→birthdayカラムのデータ型定義はvarchar型

select * from sample_tb_2;
+-----------+-------+----------+
| name      | memo  | birthday |
+-----------+-------+----------+
| testname3 | test3 | January  |
| testname4 | test4 | January  |
+-----------+-------+----------+

show columns from sample_tb_2;
+----------+-------------+------+-----+---------+-------+
| Field    | Type        | Null | Key | Default | Extra |
+----------+-------------+------+-----+---------+-------+
| name     | varchar(20) | YES  |     | NULL    |       |
| memo     | varchar(20) | YES  |     | NULL    |       |
| birthday | varchar(20) | YES  |     | NULL    |       |
+----------+-------------+------+-----+---------+-------+

Elasticsearchのマッピング定義

→birthdayカラムのデータ型定義はdate型に設定

PUT /20220523_sample_tb
{
  "mappings": {
    "properties": {
      "name": { "type": "text" },
      "memo": { "type": "text" },
      "birthday": { "type": "date" }
    }
  }
}

confファイル

input {
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    statement => "SELECT * from sample_tb_1,sample_tb_2"
  }
}
output {
        elasticsearch {
        hosts => ["localhost:9200"]
        index => "20220523_sample_tb"
	}
}

・上記のようにconfの設定をした場合、テーブル1(sample_tb_1)取り込み時にはマッピングエラーは発生しないが、テーブル2(sample_tb_2)取り込み時には、birthdayカラムのデータ型不一致によるマッピングエラーが発生すると推測しました。

ログ

[INFO ] 2022-05-23 05:06:30.884 [[main]-pipeline-manager] javapipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["/etc/logstash/conf.d/jdbc_20220523_sample_tb.conf"], :thread=>"#<Thread:0x3b5fb425 run>"}
[INFO ] 2022-05-23 05:06:31.738 [[main]-pipeline-manager] javapipeline - Pipeline Java execution initialization time {"seconds"=>0.85}
[INFO ] 2022-05-23 05:06:31.868 [[main]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"main"}
[INFO ] 2022-05-23 05:06:31.933 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
[INFO ] 2022-05-23 05:06:33.075 [[main]<jdbc] jdbc - (0.019348s) SELECT * from sample_tb_1,sample_tb_2
[WARN ] 2022-05-23 05:06:33.479 [[main]>worker0] elasticsearch - Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"20220523_sample_tb", :routing=>nil}, {"name"=>"testname3", "@timestamp"=>2022-05-23T05:06:33.129Z, "birthday"=>"January", "@version"=>"1", "memo"=>"test3"}], :response=>{"index"=>{"_index"=>"20220523_sample_tb", "_type"=>"_doc", "_id"=>"wLZP74AB9E9jQ0pHTUti", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [birthday] of type [date] in document with id 'wLZP74AB9E9jQ0pHTUti'. Preview of field's value: 'January'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to parse date field [January] with format [strict_date_optional_time||epoch_millis]", "caused_by"=>{"type"=>"date_time_parse_exception", "reason"=>"Failed to parse with all enclosed parsers"}}}}}}
[WARN ] 2022-05-23 05:06:33.481 [[main]>worker0] elasticsearch - Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"20220523_sample_tb", :routing=>nil}, {"name"=>"testname3", "@timestamp"=>2022-05-23T05:06:33.139Z, "birthday"=>"January", "@version"=>"1", "memo"=>"test3"}], :response=>{"index"=>{"_index"=>"20220523_sample_tb", "_type"=>"_doc", "_id"=>"wbZP74AB9E9jQ0pHTUti", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [birthday] of type [date] in document with id 'wbZP74AB9E9jQ0pHTUti'. Preview of field's value: 'January'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to parse date field [January] with format [strict_date_optional_time||epoch_millis]", "caused_by"=>{"type"=>"date_time_parse_exception", "reason"=>"Failed to parse with all enclosed parsers"}}}}}}
[WARN ] 2022-05-23 05:06:33.481 [[main]>worker0] elasticsearch - Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"20220523_sample_tb", :routing=>nil}, {"name"=>"testname4", "@timestamp"=>2022-05-23T05:06:33.140Z, "birthday"=>"January", "@version"=>"1", "memo"=>"test4"}], :response=>{"index"=>{"_index"=>"20220523_sample_tb", "_type"=>"_doc", "_id"=>"wrZP74AB9E9jQ0pHTUti", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [birthday] of type [date] in document with id 'wrZP74AB9E9jQ0pHTUti'. Preview of field's value: 'January'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to parse date field [January] with format [strict_date_optional_time||epoch_millis]", "caused_by"=>{"type"=>"date_time_parse_exception", "reason"=>"Failed to parse with all enclosed parsers"}}}}}}
[WARN ] 2022-05-23 05:06:33.482 [[main]>worker0] elasticsearch - Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"20220523_sample_tb", :routing=>nil}, {"name"=>"testname4", "@timestamp"=>2022-05-23T05:06:33.141Z, "birthday"=>"January", "@version"=>"1", "memo"=>"test4"}], :response=>{"index"=>{"_index"=>"20220523_sample_tb", "_type"=>"_doc", "_id"=>"w7ZP74AB9E9jQ0pHTUti", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [birthday] of type [date] in document with id 'w7ZP74AB9E9jQ0pHTUti'. Preview of field's value: 'January'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to parse date field [January] with format [strict_date_optional_time||epoch_millis]", "caused_by"=>{"type"=>"date_time_parse_exception", "reason"=>"Failed to parse with all enclosed parsers"}}}}}}
[INFO ] 2022-05-23 05:06:33.738 [[main]-pipeline-manager] javapipeline - Pipeline terminated {"pipeline.id"=>"main"}
[INFO ] 2022-05-23 05:06:34.063 [Converge PipelineAction::Delete<main>] pipelinesregistry - Removed pipeline from registry successfully {:pipeline_id=>:main}
[INFO ] 2022-05-23 05:06:34.096 [LogStash::Runner] runner - Logstash shut down.

・推測通り、テーブル2(sample_tb_2)のデータでマッピングエラーが発生したことがわかります。

kibanaでデータ投入がされているか確認

・kibanaで確認すると、マッピングエラーが発生しなかったデータを含め、データが1件も投入されていない状況です。

GET /20220523_sample_tb/_search?pretty=true
{
  "query": {"match_all": {}}
}
{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 0,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  }
}

推測

・マッピングエラーが発生しなかったデータを含め、データが1件も投入されていない
→上記の検証結果から<想定2>のように、input filter outputの各プラグインは、各プラグイン内での処理が全て完了しないと次のフェーズに進まないのでは?と推測しましたが、いかがでしょうか。
(偏見かもしれませんが)<想定1>の方が、自然な動きのような気がするのですが・・・

追記

Tomo_Mさまが仰った

"SELECT * from example_table"の結果をJDBC input pluginが取得し、得られた結果を1行ごとに1つの「イベント」として(内部的な)キューに格納

は、私としてはログのこちらの部分と推測しています。

[INFO ] 2022-05-23 05:06:33.075 [[main]<jdbc] jdbc - (0.019348s) SELECT * from sample_tb_1,sample_tb_2

検証ありがとうございます。確かに、filter/outputで各イベントがまとめて処理されていそうですね…。

opensearchのものですが、こちらにLogstashの実行モデルについて解説がありました。貼っていただいたLogstash本家のものより少し詳細で分かりやすいかもしれません。logstash.ymlで設定可能なbatch.sizeとpipeline.workerを考える必要があるようです。「各workerが、input pluginからbatch.size数のイベントを切り出して、filter plugin → output pluginとバッチ処理する」と言う流れのようです。誤った回答で申し訳ありませんでした。

OpenSearch/OpenDistro are AWS run products and differ from the original Elasticsearch and Kibana products that Elastic builds and maintains. You may need to contact them directly for further assistance.

(This is an automated response from your friendly Elastic bot. Please report this post if you have any suggestions or concerns :elasticheart: )

Tomo_Mさま

ご返信ありがとうございます。
毎度ご返信が遅くなり、申し訳ございません。

リンク先のご共有ありがとうございます。
Tomo_Mさまからご教示頂いた知見を元に調査したところ、本家Logstashのドキュメントにもpipeline.workers、pipeline.batch.sizeの説明がありました。

Tuning and Profiling Logstash Performance | Logstash Reference [8.2] | Elastic

上記本家ドキュメントの内容を読み解くと、

  • pipeline.workers→filter pluginでの処理とoutput pluginでの処理に使用可能なスレッド数

  • pipeline.batch.size→input pluginでの処理から収集する最大イベント数

また、私の手元の書籍にも、

本パラメータの値が大きければ大きいほど、メモリキューに溜まっているイベントを1回でまとめて拾って、Filter処理に回すことができます

※引用元: Elastic Stack実践ガイド[Logstash/Beats編] (impress top gear)

という記述がありますので、pipeline.batch.sizeはメモリキューから指定した数のイベントを切り出して、filter pluginに渡しているようですね。

ーー
本家ドキュメントには、

inflight count(実行中のイベントの総数)についての記述もありました。

The total number of inflight events is determined by the product of the pipeline.workers and pipeline.batch.size settings.

  • inflight count=pipeline.workers(スレッド数)×pipeline.batch.size(まとめて処理するイベント数)

ということで、Tomo_Mさまの仰る通り、パフォーマンス向上を検討する際にはpipeline.workersとpipeline.batch.sizeを検討する必要がありそうです。

ーー
少し話がそれましたが、
ご共有頂いたOpensearchのドキュメントを読み解くと、

Each pipeline worker also runs within its own thread meaning that Logstash processes multiple events simultaneously.
Instead of executing that code 100 times for 100 events, it’s more efficient to execute it once for a batch of 100 events.

filter pluginとoutput pluginが複数のイベントを同時かつまとめて処理することは、まず間違いなさそうですね。

イメージとしては、このような感じでしょうか。

"SELECT * from example_table"の結果をJDBC input pluginが取得

得られた結果を1行ごとに1つの「イベント」として(内部的な)キューに格納

filter pluginが、pipeline.batch.sizeで指定した数のイベントをまとめて処理

output pluginが、pipeline.batch.sizeで指定した数のイベントをまとめて処理

Tomo_Mさまにご協力頂き、当方としてもかなり調査が進みました。
誠にありがとうございました。

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.