Hello, I am a newbie and I tried syncing Elasticsearch with MySQL database (two tables from my database). Even they are synced I am facing some issues as described below:
Code:
base.conf:
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.22.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql_8:3306/pcs_accounts_db"
jdbc_user => "pcs_db_user"
jdbc_password => "laravel_db"
sql_log_level => "debug"
clean_run => true
record_last_run => false
type => "txn"
statement => "SELECT * FROM ac_transaction_dump"
}
jdbc {
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.22.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql_8:3306/pcs_accounts_db"
jdbc_user => "pcs_db_user"
jdbc_password => "laravel_db"
sql_log_level => "debug"
clean_run => true
record_last_run => false
type => "trial"
statement => "SELECT * FROM ac_daily_trial_balance"
}
}
filter {
mutate {
remove_field => ["@version", "@timestamp"]
}
}
output {
stdout { codec => rubydebug { metadata => true } }
if [type] == "txn" {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
data_stream => "false"
index => "ac_transaction_dump"
document_id => "%{transaction_dump_id}"
}
}
if [type] == "trial" {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
data_stream => "false"
index => "ac_daily_trial_balance"
document_id => "%{daily_trial_balance_id}"
}
}
}
change.conf:
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.22.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql_8:3306/pcs_accounts_db"
jdbc_user => "pcs_db_user"
jdbc_password => "laravel_db"
type => "txn"
use_column_value => true
tracking_column => 'transaction_dump_id'
last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run_a'"
sql_log_level => "debug"
schedule => "*/5 * * * * *"
statement => "
SELECT *
FROM ac_transaction_dump
WHERE (created_at > :sql_last_value)
OR (updated_at > :sql_last_value);
"
}
jdbc {
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.22.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql_8:3306/pcs_accounts_db"
jdbc_user => "pcs_db_user"
jdbc_password => "laravel_db"
type => "trial"
use_column_value => true
tracking_column => 'daily_trial_balance_id'
last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run_b"
sql_log_level => "debug"
schedule => "*/5 * * * * *"
statement => "
SELECT *
FROM ac_daily_trial_balance
WHERE (created_at > :sql_last_value)
OR (updated_at > :sql_last_value);
"
}
}
filter {
if [deleted_at] {
mutate {
add_field => { "[@metadata][action]" => "delete" }
}
}
mutate {
remove_field => ["@version", "@timestamp"]
}
}
# stdout { codec => rubydebug { metadata => true } }
output {
if [type] == "txn" {
if [@metadata][action] == "delete" {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "ac_transaction_dump"
action => "delete"
document_id => "%{transaction_dump_id}"
}
}
else {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "ac_transaction_dump"
document_id => "%{transaction_dump_id}"
}
}
}
if [type] == "trial" {
if [@metadata][action] == "delete" {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "ac_daily_trial_balance"
action => "delete"
document_id => "%{daily_trial_balance_id}"
}
}
else {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "ac_daily_trial_balance"
document_id => "%{daily_trial_balance_id}"
}
}
}
}
pipelines.yml:
- pipeline.id: base-pipeline
path.config: "/usr/share/logstash/pipeline/base.conf"
- pipeline.id: incremental-pipeline
path.config: "/usr/share/logstash/pipeline/change.conf"
Issues:
After starting elasticsearch and kibana, under index management tab in kibana:
After starting the logstash with "docker-compose up -d logstash" :
After executing the above queries, indices data changes again. Note that I neither added any records in the db nor updated or deleted anything.
Again I executed the queries for both indices, and the storage and docs_count increases again without any change in the database.
Also note that even there is no change in the database, executing queries each time is taking so much time:
Please look at these issues and guide me where am I going wrong or if any change needed in the code.
Note: tracking_column which is the primary key for each table is of type UUID.
Please reply if anymore info needed.