Sync multiple mysql tables with logstash

Hello, please take a look at my conf file for logstash:

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.22.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://mysql_8:3306/pcs_accounts_db"
    jdbc_user => "pcs_db_user"
    jdbc_password => "laravel_db"
    sql_log_level => "debug"  
    clean_run => true 
    record_last_run => false
    statement => "SELECT * FROM pcs_accounts_db.ac_transaction_dump"
    type => 'transaction'
  }

  jdbc {
    jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.22.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://mysql_8:3306/pcs_accounts_db"
    jdbc_user => "pcs_db_user"
    jdbc_password => "laravel_db"
    sql_log_level => "debug"  
    clean_run => true 
    record_last_run => false
    statement => "SELECT * FROM pcs_accounts_db.ac_daily_trial_balance"
    type => 'trial'
  }
}

filter {  
  mutate {
    remove_field => ["@version", "@timestamp"]
  }
}

output {
  # stdout { codec => rubydebug { metadata => true } }
  if [type] == "transaction" {
          elasticsearch {
              hosts => ["http://elasticsearch:9200"]
              index => "ac_transaction_dump"
              data_stream => false
          }
      }
      if [type] == "trial" {
          elasticsearch {
              hosts => ["http://localhost:9200"]
              index => "ac_daily_trial_balance"
              data_stream => false
          }
      }
}

Note, that I just added two jdbc inputs as suggested in the documentation to sync multiple tables.
But this is not working.

Logstash logs in docker are as:

Using bundled JDK: /usr/share/logstash/jdk
2024-03-20 18:21:14 /usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb:13: warning: method redefined; discarding old to_int
2024-03-20 18:21:14 /usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb:13: warning: method redefined; discarding old to_f
2024-03-20 18:21:18 Sending Logstash logs to /usr/share/logstash/logs which is now configured via log4j2.properties
2024-03-20 18:21:18 [2024-03-20T12:51:18,683][INFO ][logstash.runner          ] Log4j configuration path used is: /usr/share/logstash/config/log4j2.properties
2024-03-20 18:21:18 [2024-03-20T12:51:18,689][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"8.12.2", "jruby.version"=>"jruby 9.4.5.0 (3.1.4) 2023-11-02 1abae2700f OpenJDK 64-Bit Server VM 17.0.10+7 on 17.0.10+7 +indy +jit [x86_64-linux]"}
2024-03-20 18:21:18 [2024-03-20T12:51:18,692][INFO ][logstash.runner          ] JVM bootstrap flags: [-XX:+HeapDumpOnOutOfMemoryError, -Dlogstash.jackson.stream-read-constraints.max-number-length=10000, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED, -Djruby.regexp.interruptible=true, --add-opens=java.base/java.security=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED, -Dio.netty.allocator.maxOrder=11, -Dlog4j2.isThreadContextMapInheritable=true, -Xms1g, -Dlogstash.jackson.stream-read-constraints.max-string-length=200000000, -Djdk.io.File.enableADS=true, -Dfile.encoding=UTF-8, --add-opens=java.base/java.io=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED, -Djruby.compile.invokedynamic=true, -Xmx1g, -Djava.security.egd=file:/dev/urandom, -Djava.awt.headless=true, -Dls.cgroup.cpuacct.path.override=/, -Dls.cgroup.cpu.path.override=/, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED]
2024-03-20 18:21:18 [2024-03-20T12:51:18,694][INFO ][logstash.runner          ] Jackson default value override `logstash.jackson.stream-read-constraints.max-string-length` configured to `200000000`
2024-03-20 18:21:18 [2024-03-20T12:51:18,694][INFO ][logstash.runner          ] Jackson default value override `logstash.jackson.stream-read-constraints.max-number-length` configured to `10000`
2024-03-20 18:21:19 [2024-03-20T12:51:19,218][WARN ][deprecation.logstash.monitoringextension.pipelineregisterhook] Internal collectors option for Logstash monitoring is deprecated and targeted for removal in the next major version.
2024-03-20 18:21:19 Please configure Elastic Agent to monitor Logstash. Documentation can be found at: 
2024-03-20 18:21:19 https://www.elastic.co/guide/en/logstash/current/monitoring-with-elastic-agent.html
2024-03-20 18:21:19 [2024-03-20T12:51:19,443][INFO ][logstash.licensechecker.licensereader] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://elasticsearch:9200/]}}
2024-03-20 18:21:19 [2024-03-20T12:51:19,521][WARN ][logstash.licensechecker.licensereader] Restored connection to ES instance {:url=>"http://elasticsearch:9200/"}
2024-03-20 18:21:19 [2024-03-20T12:51:19,522][INFO ][logstash.licensechecker.licensereader] Elasticsearch version determined (8.12.2) {:es_version=>8}
2024-03-20 18:21:19 [2024-03-20T12:51:19,523][WARN ][logstash.licensechecker.licensereader] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>8}
2024-03-20 18:21:19 [2024-03-20T12:51:19,555][INFO ][logstash.monitoring.internalpipelinesource] Monitoring License OK
2024-03-20 18:21:19 [2024-03-20T12:51:19,555][INFO ][logstash.monitoring.internalpipelinesource] Validated license for monitoring. Enabling monitoring pipeline.
2024-03-20 18:21:19 [2024-03-20T12:51:19,649][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
2024-03-20 18:21:20 [2024-03-20T12:51:20,085][INFO ][org.reflections.Reflections] Reflections took 149 ms to scan 1 urls, producing 132 keys and 468 values
2024-03-20 18:21:22 [2024-03-20T12:51:22,004][INFO ][logstash.javapipeline    ] Pipeline `.monitoring-logstash` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
2024-03-20 18:21:22 [2024-03-20T12:51:22,022][INFO ][logstash.outputs.elasticsearchmonitoring][.monitoring-logstash] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearchMonitoring", :hosts=>["http://elasticsearch:9200"]}
2024-03-20 18:21:22 [2024-03-20T12:51:22,032][INFO ][logstash.outputs.elasticsearchmonitoring][.monitoring-logstash] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://elasticsearch:9200/]}}
2024-03-20 18:21:22 [2024-03-20T12:51:22,047][WARN ][logstash.outputs.elasticsearchmonitoring][.monitoring-logstash] Restored connection to ES instance {:url=>"http://elasticsearch:9200/"}
2024-03-20 18:21:22 [2024-03-20T12:51:22,048][INFO ][logstash.outputs.elasticsearchmonitoring][.monitoring-logstash] Elasticsearch version determined (8.12.2) {:es_version=>8}
2024-03-20 18:21:22 [2024-03-20T12:51:22,050][WARN ][logstash.outputs.elasticsearchmonitoring][.monitoring-logstash] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>8}
2024-03-20 18:21:22 [2024-03-20T12:51:22,062][WARN ][logstash.javapipeline    ][.monitoring-logstash] 'pipeline.ordered' is enabled and is likely less efficient, consider disabling if preserving event order is not necessary
2024-03-20 18:21:22 [2024-03-20T12:51:22,073][INFO ][logstash.javapipeline    ][.monitoring-logstash] Starting pipeline {:pipeline_id=>".monitoring-logstash", "pipeline.workers"=>1, "pipeline.batch.size"=>2, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>2, "pipeline.sources"=>["monitoring pipeline"], :thread=>"#<Thread:0x688b7fae /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:134 run>"}
2024-03-20 18:21:22 [2024-03-20T12:51:22,115][INFO ][logstash.javapipeline    ] Pipeline `base-pipeline` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
2024-03-20 18:21:22 [2024-03-20T12:51:22,129][INFO ][logstash.outputs.elasticsearch][base-pipeline] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://localhost:9200"]}
2024-03-20 18:21:22 [2024-03-20T12:51:22,137][INFO ][logstash.outputs.elasticsearch][base-pipeline] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
2024-03-20 18:21:22 [2024-03-20T12:51:22,144][INFO ][logstash.outputs.elasticsearch][base-pipeline] Failed to perform request {:message=>"Connect to localhost:9200 [localhost/127.0.0.1] failed: Connection refused", :exception=>Manticore::SocketException, :cause=>#<Java::OrgApacheHttpConn::HttpHostConnectException: Connect to localhost:9200 [localhost/127.0.0.1] failed: Connection refused>}
2024-03-20 18:21:22 [2024-03-20T12:51:22,146][WARN ][logstash.outputs.elasticsearch][base-pipeline] Attempted to resurrect connection to dead ES instance, but got an error {:url=>"http://localhost:9200/", :exception=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :message=>"Elasticsearch Unreachable: [http://localhost:9200/][Manticore::SocketException] Connect to localhost:9200 [localhost/127.0.0.1] failed: Connection refused"}
2024-03-20 18:21:22 [2024-03-20T12:51:22,148][INFO ][logstash.outputs.elasticsearch][base-pipeline] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://elasticsearch:9200"]}
2024-03-20 18:21:22 [2024-03-20T12:51:22,153][INFO ][logstash.outputs.elasticsearch][base-pipeline] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://elasticsearch:9200/]}}
2024-03-20 18:21:22 [2024-03-20T12:51:22,166][WARN ][logstash.outputs.elasticsearch][base-pipeline] Restored connection to ES instance {:url=>"http://elasticsearch:9200/"}
2024-03-20 18:21:22 [2024-03-20T12:51:22,167][INFO ][logstash.outputs.elasticsearch][base-pipeline] Elasticsearch version determined (8.12.2) {:es_version=>8}
2024-03-20 18:21:22 [2024-03-20T12:51:22,167][WARN ][logstash.outputs.elasticsearch][base-pipeline] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>8}
2024-03-20 18:21:22 [2024-03-20T12:51:22,179][INFO ][logstash.outputs.elasticsearch][base-pipeline] Using a default mapping template {:es_version=>8, :ecs_compatibility=>:v8}
2024-03-20 18:21:22 [2024-03-20T12:51:22,188][INFO ][logstash.javapipeline    ][base-pipeline] Starting pipeline {:pipeline_id=>"base-pipeline", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, "pipeline.sources"=>["/usr/share/logstash/pipeline/base.conf"], :thread=>"#<Thread:0x4ac9bf3f /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:134 run>"}
2024-03-20 18:21:22 [2024-03-20T12:51:22,628][INFO ][logstash.javapipeline    ][.monitoring-logstash] Pipeline Java execution initialization time {"seconds"=>0.55}
2024-03-20 18:21:22 [2024-03-20T12:51:22,649][INFO ][logstash.javapipeline    ][.monitoring-logstash] Pipeline started {"pipeline.id"=>".monitoring-logstash"}
2024-03-20 18:21:22 [2024-03-20T12:51:22,809][INFO ][logstash.javapipeline    ][base-pipeline] Pipeline Java execution initialization time {"seconds"=>0.62}
2024-03-20 18:21:23 [2024-03-20T12:51:23,338][INFO ][logstash.inputs.jdbc     ][base-pipeline] ECS compatibility is enabled but `target` option was not specified. This may cause fields to be set at the top-level of the event where they are likely to clash with the Elastic Common Schema. It is recommended to set the `target` option to avoid potential schema conflicts (if your data is ECS compliant or non-conflicting, feel free to ignore this message)
2024-03-20 18:21:23 [2024-03-20T12:51:23,366][INFO ][logstash.inputs.jdbc     ][base-pipeline] ECS compatibility is enabled but `target` option was not specified. This may cause fields to be set at the top-level of the event where they are likely to clash with the Elastic Common Schema. It is recommended to set the `target` option to avoid potential schema conflicts (if your data is ECS compliant or non-conflicting, feel free to ignore this message)
2024-03-20 18:21:23 [2024-03-20T12:51:23,367][INFO ][logstash.javapipeline    ][base-pipeline] Pipeline started {"pipeline.id"=>"base-pipeline"}
2024-03-20 18:21:23 [2024-03-20T12:51:23,383][INFO ][logstash.agent           ] Pipelines running {:count=>2, :running_pipelines=>[:".monitoring-logstash", :"base-pipeline"], :non_running_pipelines=>[]}
2024-03-20 18:21:24 [2024-03-20T12:51:24,103][INFO ][logstash.outputs.elasticsearch][base-pipeline][364684caa67281d0c4870e968a15ed5790819f1eb13fdb319376991150a31308] Aborting the batch due to shutdown request while waiting for connections to become live
2024-03-20 18:21:24 [2024-03-20T12:51:24,118][INFO ][logstash.outputs.elasticsearch][base-pipeline][364684caa67281d0c4870e968a15ed5790819f1eb13fdb319376991150a31308] Aborting the batch due to shutdown request while waiting for connections to become live
2024-03-20 18:21:24 [2024-03-20T12:51:24,122][INFO ][org.logstash.execution.WorkerLoop][base-pipeline] Received signal to abort processing current batch. Terminating pipeline worker [base-pipeline]>worker6
2024-03-20 18:21:24 [2024-03-20T12:51:24,124][INFO ][logstash.outputs.elasticsearch][base-pipeline][364684caa67281d0c4870e968a15ed5790819f1eb13fdb319376991150a31308] Aborting the batch due to shutdown request while waiting for connections to become live
2024-03-20 18:21:24 [2024-03-20T12:51:24,125][INFO ][logstash.outputs.elasticsearch][base-pipeline][364684caa67281d0c4870e968a15ed5790819f1eb13fdb319376991150a31308] Aborting the batch due to shutdown request while waiting for connections to become live
2024-03-20 18:21:24 [2024-03-20T12:51:24,126][INFO ][org.logstash.execution.WorkerLoop][base-pipeline] Received signal to abort processing current batch. Terminating pipeline worker [base-pipeline]>worker3
2024-03-20 18:21:24 [2024-03-20T12:51:24,126][INFO ][org.logstash.execution.WorkerLoop][base-pipeline] Received signal to abort processing current batch. Terminating pipeline worker [base-pipeline]>worker5
2024-03-20 18:21:24 [2024-03-20T12:51:24,128][INFO ][org.logstash.execution.WorkerLoop][base-pipeline] Received signal to abort processing current batch. Terminating pipeline worker [base-pipeline]>worker7
2024-03-20 18:21:24 [2024-03-20T12:51:24,162][INFO ][logstash.outputs.elasticsearch][base-pipeline][364684caa67281d0c4870e968a15ed5790819f1eb13fdb319376991150a31308] Aborting the batch due to shutdown request while waiting for connections to become live
2024-03-20 18:21:24 [2024-03-20T12:51:24,163][INFO ][org.logstash.execution.WorkerLoop][base-pipeline] Received signal to abort processing current batch. Terminating pipeline worker [base-pipeline]>worker4
2024-03-20 18:21:24 [2024-03-20T12:51:24,235][INFO ][logstash.outputs.elasticsearch][base-pipeline][364684caa67281d0c4870e968a15ed5790819f1eb13fdb319376991150a31308] Aborting the batch due to shutdown request while waiting for connections to become live
2024-03-20 18:21:24 [2024-03-20T12:51:24,237][INFO ][org.logstash.execution.WorkerLoop][base-pipeline] Received signal to abort processing current batch. Terminating pipeline worker [base-pipeline]>worker1
2024-03-20 18:21:24 [2024-03-20T12:51:24,234][INFO ][logstash.outputs.elasticsearch][base-pipeline][364684caa67281d0c4870e968a15ed5790819f1eb13fdb319376991150a31308] Aborting the batch due to shutdown request while waiting for connections to become live
2024-03-20 18:21:24 [2024-03-20T12:51:24,240][INFO ][org.logstash.execution.WorkerLoop][base-pipeline] Received signal to abort processing current batch. Terminating pipeline worker [base-pipeline]>worker2
2024-03-20 18:21:24 [2024-03-20T12:51:24,380][INFO ][logstash.outputs.elasticsearch][base-pipeline][364684caa67281d0c4870e968a15ed5790819f1eb13fdb319376991150a31308] Aborting the batch due to shutdown request while waiting for connections to become live
2024-03-20 18:21:24 [2024-03-20T12:51:24,381][INFO ][org.logstash.execution.WorkerLoop][base-pipeline] Received signal to abort processing current batch. Terminating pipeline worker [base-pipeline]>worker0
2024-03-20 18:21:26 [2024-03-20T12:51:26,178][INFO ][logstash.javapipeline    ][base-pipeline] Pipeline terminated {"pipeline.id"=>"base-pipeline"}
2024-03-20 18:21:26 [2024-03-20T12:51:26,403][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:"base-pipeline"}
2024-03-20 18:21:27 [2024-03-20T12:51:27,063][INFO ][logstash.javapipeline    ][.monitoring-logstash] Pipeline terminated {"pipeline.id"=>".monitoring-logstash"}
2024-03-20 18:21:27 [2024-03-20T12:51:27,417][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:".monitoring-logstash"}
2024-03-20 18:21:27 [2024-03-20T12:51:27,424][INFO ][logstash.runner          ] Logstash shut down.

Please help if anyone has any idea.

And let me know if you feel any info is needed.

On kibana UI when querying I am getting:

{
  "error": {
    "root_cause": [
      {
        "type": "index_not_found_exception",
        "reason": "no such index [ac_daily_trail_balance]",
        "resource.type": "index_or_alias",
        "resource.id": "ac_daily_trail_balance",
        "index_uuid": "_na_",
        "index": "ac_daily_trail_balance"
      }
    ],
    "type": "index_not_found_exception",
    "reason": "no such index [ac_daily_trail_balance]",
    "resource.type": "index_or_alias",
    "resource.id": "ac_daily_trail_balance",
    "index_uuid": "_na_",
    "index": "ac_daily_trail_balance"
  },
  "status": 404
}

@dadoonet Sorry for the ping but please look at the issue.

What? Only 21h after your initial post?

Please don't do that. Or ask for an official support if you need SLA.
Otherwise just be patient and wait for the community to answer.

1 Like

Because I can't live with an issue for 24 hours :sweat_smile:

Well, I resolved the issue, just need to add multiple elasticsearch output plugin and it depends on your jdbc input -

output {

  stdout { codec => rubydebug { metadata => true } }
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    data_stream => "false"
    index => "ac_transaction_dump"
    document_id => "%{transaction_dump_id}"
  }

  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    data_stream => "false"
    index => "ac_daily_trial_balance"
    document_id => "%{daily_trial_balance_id}"
  }
}

Note: If anyone following, remove "type" from jdbc input plugin because it's deprecated.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.