Multiple JDBC Inputs in Single Logstash Config File - Unknown Error

Hello,

I've had limited success in running multiple (4 concurrent) SQL queries from an Oracle Database, and pushing their data across to my Elastic server.

When these queries run once a minute, the first 1-2 queries will complete and insert information into Elastic, however the remaining queries do not, with logstash providing the following error:

13:47:00.588 [[main]-pipeline-manager] INFO  logstash.pipeline - Pipeline main started
13:47:00.706 [Api Webserver] INFO  logstash.agent - Successfully started Logstash API endpoint {:port=>9600}
13:48:01.590 [Ruby-0-Thread-28: /opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:283] INFO  logstash.inputs.jdbc - (0.743000s) SELECT kv.hora_inicio as snap_B_time, kv.hora_fin as snap_E_time ,kv.inicio as start_sn_id,kv.fin as end_start_id, (s2.value - s1.value)/(kv.intervalo*1000000) as ASL  from
     (select iv.start_snap_time as hora_inicio,iv.end_snap_time as hora_fin,iv.start_snap_id as inicio, iv.end_snap_id as fin, extract(hour from (iv.end_snap_time    - iv.start_snap_time))*3600 + extract(minute from (iv.end_snap_time - iv.start_snap_time))*60 + extract(second from
      (iv.end_snap_time - iv.start_snap_time))as intervalo
       from
        (SELECT lag(dbid) over (order by dbid, snap_id) AS start_dbid,
        dbid AS end_dbid,lag(snap_id) over (order by dbid, snap_id) AS start_snap_id, snap_id AS end_snap_id,
        lag(end_interval_time) over (order by dbid, snap_id)
        AS start_snap_time, end_interval_time AS end_snap_time,
       lag(startup_time) over (order by dbid,  snap_id)
       AS start_startup_time, startup_time AS end_startup_time
       FROM sys.wrm$_snapshot) iv
      WHERE iv.start_snap_id IS NOT NULL
      AND iv.start_dbid=iv.end_dbid
      AND iv.start_startup_time=iv.end_startup_time) kv, sys.wrh$_sys_time_model s1, sys.wrh$_sys_time_model s2, v$sysstat n
      where
      kv.inicio = s1.snap_id and
      kv.fin = s2.snap_id and
      n.name = 'DB time' and
      s1.stat_id = s2.stat_id and
      s2.stat_id= n.stat_id and
      kv.hora_inicio > trunc(sysdate - 1)
     order by kv.hora_inicio
13:48:01.690 [Ruby-0-Thread-25: /opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:283] INFO  logstash.inputs.jdbc - (0.842000s) select enq_time, q_name from WLSTEST_SOAINFRA.EDN_EVENT_QUEUE_TABLE
13:48:01.759 [Ruby-0-Thread-26: /opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:283] INFO  logstash.inputs.jdbc - (0.979000s) select 'AVL' Source_Loc,trunc(record_date, 'HH24') Rec_Date, source_flag, count(*) from ods_avl
where record_date > trunc(sysdate) group by trunc(record_date, 'HH24'), source_flag
union
select 'Unit' Source_Loc,trunc(record_date, 'HH24') Rec_Date, source_flag, count(*) from ods_unit_status
where record_date > trunc(sysdate) group by trunc(record_date, 'HH24'), source_flag
union
select 'Event' Source_Loc,trunc(record_date, 'HH24') Rec_Date, source_flag, count(*) from ods_event
where record_date > trunc(sysdate) group by trunc(record_date, 'HH24'), source_flag
order by 2 desc
13:48:02.078 [Ruby-0-Thread-27: /opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:283] INFO  logstash.inputs.jdbc - (1.232000s) select enq_time, q_name, user_Data from wlstest_soainfra.edn_oaoo_delivery_table
13:48:02.758 [[main]>worker0] ERROR logstash.outputs.elasticsearch - An unknown error occurred sending a bulk request to Elasticsearch. We will retry indefinitely {:error_message=>"Direct self-reference leading to cycle (through reference chain: oracle.sql.STRUCT[\"descriptor\"]->oracle.sql.StructDescriptor[\"pickler\"]->oracle.jdbc.oracore.OracleTypeADT[\"connection\"]->oracle.jdbc.driver.T4CConnection[\"wrapper\"])", :error_class=>"LogStash::Json::GeneratorError", :backtrace=>["/opt/logstash-5.5.0/logstash-core/lib/logstash/json.rb:52:in `jruby_dump'", "/opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.3.6-java/lib/logstash/outputs/elasticsearch/http_client.rb:116:in `bulk'", "org/jruby/RubyArray.java:2414:in `map'", "/opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.3.6-java/lib/logstash/outputs/elasticsearch/http_client.rb:115:in `bulk'", "org/jruby/RubyArray.java:1613:in `each'", "/opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.3.6-java/lib/logstash/outputs/elasticsearch/http_client.rb:114:in `bulk'", "/opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.3.6-java/lib/logstash/outputs/elasticsearch/common.rb:225:in `safe_bulk'", "/opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.3.6-java/lib/logstash/outputs/elasticsearch/common.rb:123:in `submit'", "/opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.3.6-java/lib/logstash/outputs/elasticsearch/common.rb:91:in `retrying_submit'", "/opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.3.6-java/lib/logstash/outputs/elasticsearch/common.rb:42:in `multi_receive'", "/opt/logstash-5.5.0/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:13:in `multi_receive'", "/opt/logstash-5.5.0/logstash-core/lib/logstash/output_delegator.rb:47:in `multi_receive'", "/opt/logstash-5.5.0/logstash-core/lib/logstash/pipeline.rb:420:in `output_batch'", "org/jruby/RubyHash.java:1342:in `each'", "/opt/logstash-5.5.0/logstash-core/lib/logstash/pipeline.rb:419:in `output_batch'", "/opt/logstash-5.5.0/logstash-core/lib/logstash/pipeline.rb:365:in `worker_loop'", "/opt/logstash-5.5.0/logstash-core/lib/logstash/pipeline.rb:330:in `start_workers'"]}

To assist in troubleshooting , I've reply to this post with my logstash configuration.

From what I can tell, it appears to get caught on the rufus-scheduler, however I'm open to suggestions and resolutions to resolve it.

Thanks in advance.

input {
jdbc {
   jdbc_connection_string => "jdbc:oracle:thin:@***:1521/***"
   jdbc_user => "***"
   jdbc_password => "***"
   jdbc_validate_connection => true
   jdbc_driver_library => "/opt/logstash-5.5.0/ojdbc6.jar"
   jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
   statement => "select 'AVL' Source_Loc,trunc(record_date, 'HH24') Rec_Date, source_flag, count(*) from ods_avl
where record_date > trunc(sysdate) group by trunc(record_date, 'HH24'), source_flag
union
select 'Unit' Source_Loc,trunc(record_date, 'HH24') Rec_Date, source_flag, count(*) from ods_unit_status
where record_date > trunc(sysdate) group by trunc(record_date, 'HH24'), source_flag
union
select 'Event' Source_Loc,trunc(record_date, 'HH24') Rec_Date, source_flag, count(*) from ods_event
where record_date > trunc(sysdate) group by trunc(record_date, 'HH24'), source_flag
order by 2 desc"
   tracking_column => "rec_date"
   use_column_value => true
   type => "a"
   schedule => "* * * * *"
}
jdbc {
   jdbc_connection_string => "jdbc:oracle:thin:@***:1521/***"
   jdbc_user => "***"
   jdbc_password => "***"
   jdbc_validate_connection => true
   jdbc_driver_library => "/opt/logstash-5.5.0/ojdbc6.jar"
   jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
   statement => "select enq_time, q_name, user_Data from wlstest_soainfra.edn_oaoo_delivery_table"
   tracking_column => "enq_time"
   use_column_value => true
   type => "b"
   schedule => "* * * * *"
}
jdbc {
   jdbc_connection_string => "jdbc:oracle:thin:@***:1521/***"
   jdbc_user => "***"
   jdbc_password => "***"
   jdbc_validate_connection => true
   jdbc_driver_library => "/opt/logstash-5.5.0/ojdbc6.jar"
   jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
   statement => "select enq_time, q_name from WLSTEST_SOAINFRA.EDN_EVENT_QUEUE_TABLE"
   tracking_column => "enq_time"
   clean_run=> "true"
   type => "c"
   schedule => "* * * * *"
}
jdbc {
   jdbc_connection_string => "jdbc:oracle:thin:@***:1521/***"
   jdbc_user => "***"
   jdbc_password => "***"
   jdbc_validate_connection => true
   jdbc_driver_library => "/opt/logstash-5.5.0/ojdbc6.jar"
   jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
   statement => "SELECT kv.hora_inicio as snap_B_time, kv.hora_fin as snap_E_time ,kv.inicio as start_sn_id,kv.fin as end_start_id, (s2.value - s1.value)/(kv.intervalo*1000000) as ASL  from
     (select iv.start_snap_time as hora_inicio,iv.end_snap_time as hora_fin,iv.start_snap_id as inicio, iv.end_snap_id as fin, extract(hour from (iv.end_snap_time    - iv.start_snap_time))*3600 + extract(minute from (iv.end_snap_time - iv.start_snap_time))*60 + extract(second from
      (iv.end_snap_time - iv.start_snap_time))as intervalo
       from
        (SELECT lag(dbid) over (order by dbid, snap_id) AS start_dbid,
        dbid AS end_dbid,lag(snap_id) over (order by dbid, snap_id) AS start_snap_id, snap_id AS end_snap_id,
        lag(end_interval_time) over (order by dbid, snap_id)
        AS start_snap_time, end_interval_time AS end_snap_time,
       lag(startup_time) over (order by dbid,  snap_id)
       AS start_startup_time, startup_time AS end_startup_time
       FROM sys.wrm$_snapshot) iv
      WHERE iv.start_snap_id IS NOT NULL
      AND iv.start_dbid=iv.end_dbid
      AND iv.start_startup_time=iv.end_startup_time) kv, sys.wrh$_sys_time_model s1, sys.wrh$_sys_time_model s2, v$sysstat n
      where
      kv.inicio = s1.snap_id and
      kv.fin = s2.snap_id and
      n.name = 'DB time' and
      s1.stat_id = s2.stat_id and
      s2.stat_id= n.stat_id and
      kv.hora_inicio > trunc(sysdate - 1)
     order by kv.hora_inicio"
   tracking_column => "snap_b_time"
   use_column_value => true
   type => "d"
   schedule => "* * * * *"
}
}

output {
if [type] == "a" {
elasticsearch {
       hosts => ["***:9200"]
       index => "esb-rn-%{+DDMMYYYY}"}
}
if [type] == "b" {
elasticsearch {
       hosts => ["***:9200"]
       index => "esb-edn-oaoo-%{+DDMMYYYY}"}
}
if [type] == "c" {
elasticsearch {
       hosts => ["***:9200"]
       index => "esb-edn-eqt-%{+DDMMYYYY}"}
}
if [type] == "d" {
elasticsearch {
       hosts => ["***:9200"]
       index => "db-stats-%{+DDMMYYYY}"}

}
}

Bumping to get someone from Elastic to weigh in on this.

Hmmm. I would potentially recommend launching those in individual pipelines. In Logstash 5, that means a different Logstash instance for each jdbc input plugin instance. In Logstash 6, that will mean different entries in the new pipelines.yml file.

Hi Aaron,

Thanks for the suggestion.

As per your recommendation, I created 3 extra instances of logstash-5.5.0, named -1, -2, -3.

In instance -1

input {
jdbc {
   jdbc_connection_string => "jdbc:oracle:thin:@***/***"
   jdbc_user => "***"
   jdbc_password => "***"
   jdbc_validate_connection => true
   jdbc_driver_library => "/opt/logstash-5.5.0/ojdbc6.jar"
   jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
   statement => "select 'AVL' Source_Loc,trunc(record_date, 'HH24') Rec_Date, source_flag, count(*) from ods_avl
where record_date > trunc(sysdate) group by trunc(record_date, 'HH24'), source_flag
union
select 'Unit' Source_Loc,trunc(record_date, 'HH24') Rec_Date, source_flag, count(*) from ods_unit_status
where record_date > trunc(sysdate) group by trunc(record_date, 'HH24'), source_flag
union
select 'Event' Source_Loc,trunc(record_date, 'HH24') Rec_Date, source_flag, count(*) from ods_event
where record_date > trunc(sysdate) group by trunc(record_date, 'HH24'), source_flag
order by 2 desc"
   tracking_column => "rec_date"
   use_column_value => true
   type => "a"
   schedule => "* * * * *"
}
}

output {
if [type] == "a" {
elasticsearch {
       hosts => ["slv-eh-dba10:9200"]
       index => "esb-rn-%{+DDMMYYYY}"}
}
}

In instance -2:

input {
jdbc {
   jdbc_connection_string => "jdbc:oracle:thin:@***/***"
   jdbc_user => "***"
   jdbc_password => "***"
   jdbc_validate_connection => true
   jdbc_driver_library => "/opt/logstash-5.5.0/ojdbc6-1.jar"
   jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
   statement => "select enq_time, q_name, user_Data from wlstest_soainfra.edn_oaoo_delivery_table"
   type => "b"
   schedule => "* * * * *"
}
}
output {
if [type] == "b" {
elasticsearch {
       hosts => ["slv-eh-dba10:9200"]
              index => "esb-edn-oaoo-%{+DDMMYYYY}"}
}
}

After doing and starting it, instance-1 is ok, however instance-2 does the same thing. I've copied the driver in the second instance as well, giving it a second option as before to rule it out as well.

I'm happy to try any other suggestions you might have.

Cheers!

You haven't mentioned whether the other 2 instances are working concurrently with instance 1 or not. Does the problematic instance (seemingly instance 2) work okay when it is the only instance running?

Hi Aaron,

Spot on with your suggestion - instance 2 was causing it to misbehave. It appears that user_data in the instance2 config was a clob, which was causing it to fall over. A good lesson to learn about knowing your data types.

I'll attempt changing it back to a single pipeline and see how it goes, as with this multiple pipeline configuration on a single host, is pushing available memory.

Thanks again for all of your assistance in resolving this.

Cheers!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.