I have two pipelines which sync data from different tables in a postgresql database.
- pipeline.id: logstash_test_user_table
path.config: "/usr/share/logstash/pipeline/logstash.conf"
pipeline.workers: 1
- pipeline.id: logstash_test_addresses_table
path.config: "/usr/share/logstash/pipeline/addresses.conf"
pipeline.workers: 1
The first pipeline works fine. Here is the pipeline:
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/plugins/jdbc/postgresql-42.2.14.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://localhost:5432/db_name"
jdbc_user => "user"
jdbc_password => "pass"
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
clean_run => true
tracking_column_type => "numeric"
schedule => "*/20 * * * * *"
statement => "SELECT *, EXTRACT(epoch from modification_time) AS unix_ts_in_secs FROM users WHERE (EXTRACT(epoch from modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
}
}
filter {
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
output {
# stdout { codec => "rubydebug"}
elasticsearch {
hosts => ["localhost:9200"]
index => "rdbms_sync_idx"
document_id => "%{[@metadata][_id]}"
}
}
The second one however gives me an error:
14:54:39.207 [[logstash_test_addresses_table]-pipeline-manager] ERROR logstash.javapipeline - Pipeline aborted due to error {:pipeline_id=>"logstash_test_addresses_table", :exception=>#<Errno::ENOENT: No such file or directory - /usr/share/logstash/.logstash_jdbc_last_run>, :backtrace=>["org/jruby/RubyFile.java:1285:in `delete'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.1/lib/logstash/plugin_mixins/jdbc/value_tracking.rb:112:in `clean'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.1/lib/logstash/plugin_mixins/jdbc/value_tracking.rb:13:in `build_last_value_tracker'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.1/lib/logstash/inputs/jdbc.rb:240:in `register'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:216:in `block in register_plugins'", "org/jruby/RubyArray.java:1809:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:215:in `register_plugins'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:326:in `start_inputs'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:286:in `start_workers'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:170:in `run'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:125:in `block in start'"], "pipeline.sources"=>["/usr/share/logstash/pipeline/addresses.conf"], :thread=>"#<Thread:0x1547c557@/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:121 run>"}
Second pipeline:
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/plugins/jdbc/postgresql-42.2.14.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://localhost:5432/db_name"
jdbc_user => "user"
jdbc_password => "pass"
jdbc_paging_enabled => true
tracking_column => "id"
use_column_value => true
clean_run => true
tracking_column_type => "numeric"
schedule => "*/20 * * * * *"
statement => "SELECT * FROM addresses WHERE id > :sql_last_value"
}
}
filter {
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version"]
}
}
output {
# stdout { codec => "rubydebug"}
elasticsearch {
hosts => ["localhost:9200"]
index => "rdbms_sync_idx2"
document_id => "%{[@metadata][_id]}"
}
}
Please help me out I'm unable to understand what this error is. I assume the first pipeline is setting the metadata and second one is trying to read the same metadata.