JDBC Input | Statement Help

Greetings,

I'm trying to create a test pipeline that takes data from MySQL and dumps it into Elasticsearch and so far so good, I'm finally able to get Data into the DB. Now what I would like to do is have logstash run every minute, but update the index any time a new product is added to the DB. How would I do something like that in the statement?

  input {
      jdbc {
         type => "jdbc-demo"
         jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.44-bin.jar"
         jdbc_driver_class => "com.mysql.jdbc.Driver"
         jdbc_connection_string => "jdbc:mysql://localhost:3306/product_catalog"
         jdbc_user => "root"
         jdbc_password => "1234!"
         schedule => "* * * * *"
         statement => "SELECT name, model, manufacturer FROM products"
          }
      }
  
  output {
      stdout { codec => json_lines}
      elasticsearch {
        "hosts" => "localhost:9200"
      "index" => "product_pipeline_import"
      "document_id" => "%{name}"
      }
  }

Here's the test table that I'm testing with; nothing fancy

mysql> desc products;
+---------------+--------------+------+-----+---------+-------+
| Field         | Type         | Null | Key | Default | Extra |
+---------------+--------------+------+-----+---------+-------+
| sku           | varchar(255) | YES  |     | NULL    |       |
| name          | varchar(255) | YES  |     | NULL    |       |
| type          | varchar(255) | YES  |     | NULL    |       |
| price         | varchar(255) | YES  |     | NULL    |       |
| upc           | varchar(255) | YES  |     | NULL    |       |
| category_id   | varchar(255) | YES  |     | NULL    |       |
| category_name | varchar(255) | YES  |     | NULL    |       |
| shipping      | varchar(255) | YES  |     | NULL    |       |
| description   | varchar(255) | YES  |     | NULL    |       |
| manufacturer  | varchar(255) | YES  |     | NULL    |       |
| model         | varchar(255) | YES  |     | NULL    |       |
| url           | varchar(255) | YES  |     | NULL    |       |
| image         | varchar(255) | YES  |     | NULL    |       |
+---------------+--------------+------+-----+---------+-------+

So it looks like by having the document_id and schedule, it's catching any updates to the database and updating the index; I verified and tested this with one record; one thing I'm confused about is that when i'm looking at logstash, it seems like it runs the entire query all over again and starts deleting documents but I do end up with the expected number of documents within index; is this done by design? At least that's what it looks like. Here's what I'm seeing; every time it runs after a minute a different number of docs are deleted, but at the end I do end up with the same count of documents that match my count from the database.

(xenial)ocalhost:~$ curl 'localhost:9200/_cat/indices?v'
health status index                   uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   product_pipeline_import j2fszv6GRaeLiALIk9NCQg   5   1      24947         1046      9.3mb          9.3mb

Elasticsearch doesn't update documents in place. When a document is updated it's actually deleted and inserted anew, hence each document update will bump the "deleted" counter. I'd expect this to be described in further detail in the ES documentation.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.