Logstash fetching data from multiple MySQL databases

Hi there, I think this community is really great and helpful, especially for someone like me who is really new to the ELK stack, barely a week in.

So Ive been tasked to use elasticsearch for a new company project. In order to get the data to elasticsearch, I have been testing and using the jdbc input plugin to fetch data from multiple MySQL databases and tables, where they do share some common fields.

input {
  jdbc {
    jdbc_driver_library => "my-path/logstash-8.10.3-linux-x86_64/mysql-connector-j-8.1.0/mysql-connector-j-8.1.0.jar" 
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/db1" 
    jdbc_user => "user" 
    jdbc_password => "pass" 
    jdbc_paging_enabled => true
    tracking_column => "id"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/45 * * * * *"
    type => "type1"
    statement => "SELECT l.id, NULL as uuid, u.first_name as name, l.id_number, l.amount, l.date_created FROM db1_table1 l INNER JOIN user_profile u ON l.user_profile_id = u.id"
    clean_run => true
  }

  jdbc {
    jdbc_driver_library => "my-path/logstash-8.10.3-linux-x86_64/mysql-connector-j-8.1.0/mysql-connector-j-8.1.0.jar" 
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/db1" 
    jdbc_user => "user" 
    jdbc_password => "pass" 
    jdbc_paging_enabled => true
    tracking_column => "id"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/45 * * * * *"
    type => "type2"
    statement => "SELECT id, NULL as uuid, name, id_no as id_number, amount, date_created FROM db1_table2"
    clean_run => true
  }

  jdbc {
    jdbc_driver_library => "my-path/logstash-8.10.3-linux-x86_64/mysql-connector-j-8.1.0/mysql-connector-j-8.1.0.jar" 
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/db2" 
    jdbc_user => "user" 
    jdbc_password => "pass" 
    jdbc_paging_enabled => true
    tracking_column => "id"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/45 * * * * *"
    type => "type3"
    statement => "SELECT id, uuid, name, id_number, amount_required as amount, date_created FROM db2_table1"
    clean_run => true
  }
}
filter {
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}
output {
  stdout {
    codec => "rubydebug"
  }
  elasticsearch {
    ssl_certificate_authorities => "my-path/elasticsearch-8.10.3/config/certs/http_ca.crt"
    hosts => ["https://localhost:9200"]
    index => "my-index"
    document_id => "%{type}_%{id}"
    user => "myelastic"
    password => "mypass"
  }
}

I guess the most crucial thing I want to know is if I am doing it right, and that if anything can be improved (open to any form of constructive criticism for the sake of improvement haha).

The thing is this currently works just fine because this consolidation amasses up to about 300 rows/documents. Im just worried for when the fetched rows/documents is larger, probably around 500,000.

I did some tests on another mock database with 500,000 records and it took roughly around 3 minutes give or take to complete.

Thank you for your time and consideration to take a look at my code. Really appreciate it.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.