RDS -> (Logstash) -> Elasticsearch の取り込みができない

バージョン: logstash:7.6.1 ( https://hub.docker.com/_/logstash?tab=tags )

Logstash Pipeline を利用した AWS RDS (MySQL 5.7.12) からの AWS Elasticsearch(7.1) へのデータ流し込みでつまづいてしまっています。
おそらく該当のテーブルに geometry型のカラムがあるためではないかと思い、以下2点を試したのですが取り込みができていません。
取り込みが成功するための方法がわかれば教えていただきたいです。

(1) geometry型 のカラムを ST_AsText でテキストにして取り込みを試みましたが、SQLの statement が長いためか
並び順の次の次くらいのカラムが存在しないというエラーになってしまいました。

[ERROR][logstash.inputs.jdbc     ][(問題のテーブル)] Java::JavaSql::SQLSyntaxErrorException: Unknown column '(エラーになったカラム1)' in 'field list': ...

(エラーになったカラム1を外してリトライ)
[ERROR][logstash.inputs.jdbc     ][(問題のテーブル)] Java::JavaSql::SQLSyntaxErrorException: Unknown column '(エラーになったカラム1の次のカラム2)' in 'field list': ...
  • ここでお聞きしたいのは、statement のSELECT SQL 文に長さのMAXがあるのか?ということです。

(2) 次に試したのは、geometry型をテキストにしてビューにして、そのビューを取り込みのターゲットにしようとしました。
そうしたところ、そのビューは存在しないという以下のようなエラーになりました。
(設定に定義したユーザーで単体での接続・取得ができることは確認しています。権限ではなさそう)

[WARN ][logstash.inputs.jdbc     ][(問題のテーブル)] Exception when executing JDBC query {:exception=>#<Sequel::DatabaseError: Java::JavaSql::SQLSyntaxErrorException: Table '(DB名).view_(問題のテーブル名)' doesn't exist>},
  • この状況での質問は、ビューを利用した取り込みはできない、ということなのでしょうか?

コメントいただけるとありがたいです。
よろしくお願いいたします。

設定・状況類

pipelines.yml

- pipeline.id: hoge_fungar_geos
  pipeline.batch.size: 250
  pipeline.workers: 1
  path.config: "/usr/share/logstash/pipeline/hoge_fungar_geos.conf"

pipeline/hoge_fungar_geos.conf

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/${JDBC_DRIVER_FILENAME}"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://${RDS_ENDPOINT}:3306/${RDS_DATABASE}"
    jdbc_default_timezone => "${TZ}"
    jdbc_user => "${RDS_USERNAME}"
    jdbc_password => "${RDS_PASSWORD}"
    jdbc_paging_enabled => true
    tracking_column => "updated_at"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "* */1 * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(updated_at) AS unix_ts_in_secs FROM view_hoge_fungar_geos WHERE (UNIX_TIMESTAMP(updated_at) > :sql_last_value AND updated_at < NOW()) ORDER BY updated_at ASC"
  }
}

filter {
  mutate {
    copy => {"meta_hoge_fungar_id" => "[@metadata][_hoge_fungar_id]"}
    remove_field => ["meta_hoge_fungar_id", "@version", "unix_ts_in_secs"]
  }
}

output {
  elasticsearch {
    hosts => "${ES_ENDPOINT}:443"
    ssl => true
    ilm_enabled => false
    index => "hoge_fungar_geos"
    document_id => "%{hoge_fungar_id}"
  }
}

DESC view_hoge_fungar_geos

+-----------------------------+--------------+------+-----+---------+-------+
| Field                       | Type         | Null | Key | Default | Extra |
+-----------------------------+--------------+------+-----+---------+-------+
| hoge_fungar_id              | int(11)      | NO   |     | NULL    |       |
| geom_xxx1                   | longtext     | YES  |     | NULL    |       |
| xxxxxxxx_xxx2               | varchar(255) | YES  |     | NULL    |       |
| xxxxxx3                     | longtext     | YES  |     | NULL    |       |
| xxxxxx4                     | text         | YES  |     | NULL    |       |
| xxxxx_xxxxxx5               | varchar(255) | YES  |     | NULL    |       |
| xxxxxxx_xxx6                | varchar(255) | YES  |     | NULL    |       |
| updated_at                  | datetime     | YES  |     | NULL    |       |
| xxxxxxxx_xxxxxx7_at         | datetime     | YES  |     | NULL    |       |
| xxxxxx_xxx8                 | varchar(255) | YES  |     | NULL    |       |
| xxxxxxxxxxxxxx_xxxx_xxxxx_9 | varchar(255) | YES  |     | NULL    |       |
| xxxxxx10                    | varchar(255) | YES  |     | NULL    |       |
| xxxxxxxxxx_xxxxx_11         | varchar(255) | YES  |     | NULL    |       |
| xxxxxxxxxx_xxxxx_12         | varchar(255) | YES  |     | NULL    |       |
| xxxxxxxxxx_xxxxx_13         | varchar(255) | YES  |     | NULL    |       |
| xxxxxxxxxx_xxxxx_14         | varchar(255) | YES  |     | NULL    |       |
| xxxxx15                     | varchar(255) | YES  |     | NULL    |       |
| xxx16                       | varchar(255) | YES  |     | NULL    |       |
+-----------------------------+--------------+------+-----+---------+-------+

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.