Tomo_Mさま
回答ありがとうございます。
ご返信が遅くなり、申し訳ございません。
Elasticsearchにindexするとエラーが出るようなデータを途中に入れておいて結果を確認すると、動きが把握できるかと思います。もし試されたら結果を教えて頂けると嬉しいです。
こちら試してみました。
DBのデータ
<テーブル1>→birthdayカラムのデータ型定義は date型
select * from sample_tb_1;
+-----------+-------+------------+
| name | memo | birthday |
+-----------+-------+------------+
| testname1 | test1 | 2016-01-01 |
| testname2 | test2 | 2016-01-01 |
+-----------+-------+------------+
show columns from sample_tb_1;
+----------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+----------+-------------+------+-----+---------+-------+
| name | varchar(20) | YES | | NULL | |
| memo | varchar(20) | YES | | NULL | |
| birthday | date | YES | | NULL | |
+----------+-------------+------+-----+---------+-------+
<テーブル2>→→birthdayカラムのデータ型定義はvarchar型
select * from sample_tb_2;
+-----------+-------+----------+
| name | memo | birthday |
+-----------+-------+----------+
| testname3 | test3 | January |
| testname4 | test4 | January |
+-----------+-------+----------+
show columns from sample_tb_2;
+----------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+----------+-------------+------+-----+---------+-------+
| name | varchar(20) | YES | | NULL | |
| memo | varchar(20) | YES | | NULL | |
| birthday | varchar(20) | YES | | NULL | |
+----------+-------------+------+-----+---------+-------+
Elasticsearchのマッピング定義
→birthdayカラムのデータ型定義はdate型に設定
PUT /20220523_sample_tb
{
"mappings": {
"properties": {
"name": { "type": "text" },
"memo": { "type": "text" },
"birthday": { "type": "date" }
}
}
}
confファイル
input {
jdbc {
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
statement => "SELECT * from sample_tb_1,sample_tb_2"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "20220523_sample_tb"
}
}
・上記のようにconfの設定をした場合、テーブル1(sample_tb_1)取り込み時にはマッピングエラーは発生しないが、テーブル2(sample_tb_2)取り込み時には、birthdayカラムのデータ型不一致によるマッピングエラーが発生すると推測しました。
ログ
[INFO ] 2022-05-23 05:06:30.884 [[main]-pipeline-manager] javapipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["/etc/logstash/conf.d/jdbc_20220523_sample_tb.conf"], :thread=>"#<Thread:0x3b5fb425 run>"}
[INFO ] 2022-05-23 05:06:31.738 [[main]-pipeline-manager] javapipeline - Pipeline Java execution initialization time {"seconds"=>0.85}
[INFO ] 2022-05-23 05:06:31.868 [[main]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"main"}
[INFO ] 2022-05-23 05:06:31.933 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
[INFO ] 2022-05-23 05:06:33.075 [[main]<jdbc] jdbc - (0.019348s) SELECT * from sample_tb_1,sample_tb_2
[WARN ] 2022-05-23 05:06:33.479 [[main]>worker0] elasticsearch - Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"20220523_sample_tb", :routing=>nil}, {"name"=>"testname3", "@timestamp"=>2022-05-23T05:06:33.129Z, "birthday"=>"January", "@version"=>"1", "memo"=>"test3"}], :response=>{"index"=>{"_index"=>"20220523_sample_tb", "_type"=>"_doc", "_id"=>"wLZP74AB9E9jQ0pHTUti", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [birthday] of type [date] in document with id 'wLZP74AB9E9jQ0pHTUti'. Preview of field's value: 'January'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to parse date field [January] with format [strict_date_optional_time||epoch_millis]", "caused_by"=>{"type"=>"date_time_parse_exception", "reason"=>"Failed to parse with all enclosed parsers"}}}}}}
[WARN ] 2022-05-23 05:06:33.481 [[main]>worker0] elasticsearch - Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"20220523_sample_tb", :routing=>nil}, {"name"=>"testname3", "@timestamp"=>2022-05-23T05:06:33.139Z, "birthday"=>"January", "@version"=>"1", "memo"=>"test3"}], :response=>{"index"=>{"_index"=>"20220523_sample_tb", "_type"=>"_doc", "_id"=>"wbZP74AB9E9jQ0pHTUti", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [birthday] of type [date] in document with id 'wbZP74AB9E9jQ0pHTUti'. Preview of field's value: 'January'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to parse date field [January] with format [strict_date_optional_time||epoch_millis]", "caused_by"=>{"type"=>"date_time_parse_exception", "reason"=>"Failed to parse with all enclosed parsers"}}}}}}
[WARN ] 2022-05-23 05:06:33.481 [[main]>worker0] elasticsearch - Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"20220523_sample_tb", :routing=>nil}, {"name"=>"testname4", "@timestamp"=>2022-05-23T05:06:33.140Z, "birthday"=>"January", "@version"=>"1", "memo"=>"test4"}], :response=>{"index"=>{"_index"=>"20220523_sample_tb", "_type"=>"_doc", "_id"=>"wrZP74AB9E9jQ0pHTUti", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [birthday] of type [date] in document with id 'wrZP74AB9E9jQ0pHTUti'. Preview of field's value: 'January'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to parse date field [January] with format [strict_date_optional_time||epoch_millis]", "caused_by"=>{"type"=>"date_time_parse_exception", "reason"=>"Failed to parse with all enclosed parsers"}}}}}}
[WARN ] 2022-05-23 05:06:33.482 [[main]>worker0] elasticsearch - Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"20220523_sample_tb", :routing=>nil}, {"name"=>"testname4", "@timestamp"=>2022-05-23T05:06:33.141Z, "birthday"=>"January", "@version"=>"1", "memo"=>"test4"}], :response=>{"index"=>{"_index"=>"20220523_sample_tb", "_type"=>"_doc", "_id"=>"w7ZP74AB9E9jQ0pHTUti", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [birthday] of type [date] in document with id 'w7ZP74AB9E9jQ0pHTUti'. Preview of field's value: 'January'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to parse date field [January] with format [strict_date_optional_time||epoch_millis]", "caused_by"=>{"type"=>"date_time_parse_exception", "reason"=>"Failed to parse with all enclosed parsers"}}}}}}
[INFO ] 2022-05-23 05:06:33.738 [[main]-pipeline-manager] javapipeline - Pipeline terminated {"pipeline.id"=>"main"}
[INFO ] 2022-05-23 05:06:34.063 [Converge PipelineAction::Delete<main>] pipelinesregistry - Removed pipeline from registry successfully {:pipeline_id=>:main}
[INFO ] 2022-05-23 05:06:34.096 [LogStash::Runner] runner - Logstash shut down.
・推測通り、テーブル2(sample_tb_2)のデータでマッピングエラーが発生したことがわかります。
kibanaでデータ投入がされているか確認
・kibanaで確認すると、マッピングエラーが発生しなかったデータを含め、データが1件も投入されていない状況です。
GET /20220523_sample_tb/_search?pretty=true
{
"query": {"match_all": {}}
}
{
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 0,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ]
}
}
推測
・マッピングエラーが発生しなかったデータを含め、データが1件も投入されていない
→上記の検証結果から<想定2>のように、input filter outputの各プラグインは、各プラグイン内での処理が全て完了しないと次のフェーズに進まないのでは?と推測しましたが、いかがでしょうか。
(偏見かもしれませんが)<想定1>の方が、自然な動きのような気がするのですが・・・
追記
Tomo_Mさまが仰った
"SELECT * from example_table"の結果をJDBC input pluginが取得し、得られた結果を1行ごとに1つの「イベント」として(内部的な)キューに格納
は、私としてはログのこちらの部分と推測しています。
[INFO ] 2022-05-23 05:06:33.075 [[main]<jdbc] jdbc - (0.019348s) SELECT * from sample_tb_1,sample_tb_2