Multiple JDBC Inputs in Single Logstash Config File - Unknown Error


(Scott Duncan) #1

Hello,

I've had limited success in running multiple (4 concurrent) SQL queries from an Oracle Database, and pushing their data across to my Elastic server.

When these queries run once a minute, the first 1-2 queries will complete and insert information into Elastic, however the remaining queries do not, with logstash providing the following error:

13:47:00.588 [[main]-pipeline-manager] INFO  logstash.pipeline - Pipeline main started
13:47:00.706 [Api Webserver] INFO  logstash.agent - Successfully started Logstash API endpoint {:port=>9600}
13:48:01.590 [Ruby-0-Thread-28: /opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:283] INFO  logstash.inputs.jdbc - (0.743000s) SELECT kv.hora_inicio as snap_B_time, kv.hora_fin as snap_E_time ,kv.inicio as start_sn_id,kv.fin as end_start_id, (s2.value - s1.value)/(kv.intervalo*1000000) as ASL  from
     (select iv.start_snap_time as hora_inicio,iv.end_snap_time as hora_fin,iv.start_snap_id as inicio, iv.end_snap_id as fin, extract(hour from (iv.end_snap_time    - iv.start_snap_time))*3600 + extract(minute from (iv.end_snap_time - iv.start_snap_time))*60 + extract(second from
      (iv.end_snap_time - iv.start_snap_time))as intervalo
       from
        (SELECT lag(dbid) over (order by dbid, snap_id) AS start_dbid,
        dbid AS end_dbid,lag(snap_id) over (order by dbid, snap_id) AS start_snap_id, snap_id AS end_snap_id,
        lag(end_interval_time) over (order by dbid, snap_id)
        AS start_snap_time, end_interval_time AS end_snap_time,
       lag(startup_time) over (order by dbid,  snap_id)
       AS start_startup_time, startup_time AS end_startup_time
       FROM sys.wrm$_snapshot) iv
      WHERE iv.start_snap_id IS NOT NULL
      AND iv.start_dbid=iv.end_dbid
      AND iv.start_startup_time=iv.end_startup_time) kv, sys.wrh$_sys_time_model s1, sys.wrh$_sys_time_model s2, v$sysstat n
      where
      kv.inicio = s1.snap_id and
      kv.fin = s2.snap_id and
      n.name = 'DB time' and
      s1.stat_id = s2.stat_id and
      s2.stat_id= n.stat_id and
      kv.hora_inicio > trunc(sysdate - 1)
     order by kv.hora_inicio
13:48:01.690 [Ruby-0-Thread-25: /opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:283] INFO  logstash.inputs.jdbc - (0.842000s) select enq_time, q_name from WLSTEST_SOAINFRA.EDN_EVENT_QUEUE_TABLE
13:48:01.759 [Ruby-0-Thread-26: /opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:283] INFO  logstash.inputs.jdbc - (0.979000s) select 'AVL' Source_Loc,trunc(record_date, 'HH24') Rec_Date, source_flag, count(*) from ods_avl
where record_date > trunc(sysdate) group by trunc(record_date, 'HH24'), source_flag
union
select 'Unit' Source_Loc,trunc(record_date, 'HH24') Rec_Date, source_flag, count(*) from ods_unit_status
where record_date > trunc(sysdate) group by trunc(record_date, 'HH24'), source_flag
union
select 'Event' Source_Loc,trunc(record_date, 'HH24') Rec_Date, source_flag, count(*) from ods_event
where record_date > trunc(sysdate) group by trunc(record_date, 'HH24'), source_flag
order by 2 desc
13:48:02.078 [Ruby-0-Thread-27: /opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:283] INFO  logstash.inputs.jdbc - (1.232000s) select enq_time, q_name, user_Data from wlstest_soainfra.edn_oaoo_delivery_table
13:48:02.758 [[main]>worker0] ERROR logstash.outputs.elasticsearch - An unknown error occurred sending a bulk request to Elasticsearch. We will retry indefinitely {:error_message=>"Direct self-reference leading to cycle (through reference chain: oracle.sql.STRUCT[\"descriptor\"]->oracle.sql.StructDescriptor[\"pickler\"]->oracle.jdbc.oracore.OracleTypeADT[\"connection\"]->oracle.jdbc.driver.T4CConnection[\"wrapper\"])", :error_class=>"LogStash::Json::GeneratorError", :backtrace=>["/opt/logstash-5.5.0/logstash-core/lib/logstash/json.rb:52:in `jruby_dump'", "/opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.3.6-java/lib/logstash/outputs/elasticsearch/http_client.rb:116:in `bulk'", "org/jruby/RubyArray.java:2414:in `map'", "/opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.3.6-java/lib/logstash/outputs/elasticsearch/http_client.rb:115:in `bulk'", "org/jruby/RubyArray.java:1613:in `each'", "/opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.3.6-java/lib/logstash/outputs/elasticsearch/http_client.rb:114:in `bulk'", "/opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.3.6-java/lib/logstash/outputs/elasticsearch/common.rb:225:in `safe_bulk'", "/opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.3.6-java/lib/logstash/outputs/elasticsearch/common.rb:123:in `submit'", "/opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.3.6-java/lib/logstash/outputs/elasticsearch/common.rb:91:in `retrying_submit'", "/opt/logstash-5.5.0/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.3.6-java/lib/logstash/outputs/elasticsearch/common.rb:42:in `multi_receive'", "/opt/logstash-5.5.0/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:13:in `multi_receive'", "/opt/logstash-5.5.0/logstash-core/lib/logstash/output_delegator.rb:47:in `multi_receive'", "/opt/logstash-5.5.0/logstash-core/lib/logstash/pipeline.rb:420:in `output_batch'", "org/jruby/RubyHash.java:1342:in `each'", "/opt/logstash-5.5.0/logstash-core/lib/logstash/pipeline.rb:419:in `output_batch'", "/opt/logstash-5.5.0/logstash-core/lib/logstash/pipeline.rb:365:in `worker_loop'", "/opt/logstash-5.5.0/logstash-core/lib/logstash/pipeline.rb:330:in `start_workers'"]}

To assist in troubleshooting , I've reply to this post with my logstash configuration.

From what I can tell, it appears to get caught on the rufus-scheduler, however I'm open to suggestions and resolutions to resolve it.

Thanks in advance.


(Scott Duncan) #2
input {
jdbc {
   jdbc_connection_string => "jdbc:oracle:thin:@***:1521/***"
   jdbc_user => "***"
   jdbc_password => "***"
   jdbc_validate_connection => true
   jdbc_driver_library => "/opt/logstash-5.5.0/ojdbc6.jar"
   jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
   statement => "select 'AVL' Source_Loc,trunc(record_date, 'HH24') Rec_Date, source_flag, count(*) from ods_avl
where record_date > trunc(sysdate) group by trunc(record_date, 'HH24'), source_flag
union
select 'Unit' Source_Loc,trunc(record_date, 'HH24') Rec_Date, source_flag, count(*) from ods_unit_status
where record_date > trunc(sysdate) group by trunc(record_date, 'HH24'), source_flag
union
select 'Event' Source_Loc,trunc(record_date, 'HH24') Rec_Date, source_flag, count(*) from ods_event
where record_date > trunc(sysdate) group by trunc(record_date, 'HH24'), source_flag
order by 2 desc"
   tracking_column => "rec_date"
   use_column_value => true
   type => "a"
   schedule => "* * * * *"
}
jdbc {
   jdbc_connection_string => "jdbc:oracle:thin:@***:1521/***"
   jdbc_user => "***"
   jdbc_password => "***"
   jdbc_validate_connection => true
   jdbc_driver_library => "/opt/logstash-5.5.0/ojdbc6.jar"
   jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
   statement => "select enq_time, q_name, user_Data from wlstest_soainfra.edn_oaoo_delivery_table"
   tracking_column => "enq_time"
   use_column_value => true
   type => "b"
   schedule => "* * * * *"
}
jdbc {
   jdbc_connection_string => "jdbc:oracle:thin:@***:1521/***"
   jdbc_user => "***"
   jdbc_password => "***"
   jdbc_validate_connection => true
   jdbc_driver_library => "/opt/logstash-5.5.0/ojdbc6.jar"
   jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
   statement => "select enq_time, q_name from WLSTEST_SOAINFRA.EDN_EVENT_QUEUE_TABLE"
   tracking_column => "enq_time"
   clean_run=> "true"
   type => "c"
   schedule => "* * * * *"
}
jdbc {
   jdbc_connection_string => "jdbc:oracle:thin:@***:1521/***"
   jdbc_user => "***"
   jdbc_password => "***"
   jdbc_validate_connection => true
   jdbc_driver_library => "/opt/logstash-5.5.0/ojdbc6.jar"
   jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
   statement => "SELECT kv.hora_inicio as snap_B_time, kv.hora_fin as snap_E_time ,kv.inicio as start_sn_id,kv.fin as end_start_id, (s2.value - s1.value)/(kv.intervalo*1000000) as ASL  from
     (select iv.start_snap_time as hora_inicio,iv.end_snap_time as hora_fin,iv.start_snap_id as inicio, iv.end_snap_id as fin, extract(hour from (iv.end_snap_time    - iv.start_snap_time))*3600 + extract(minute from (iv.end_snap_time - iv.start_snap_time))*60 + extract(second from
      (iv.end_snap_time - iv.start_snap_time))as intervalo
       from
        (SELECT lag(dbid) over (order by dbid, snap_id) AS start_dbid,
        dbid AS end_dbid,lag(snap_id) over (order by dbid, snap_id) AS start_snap_id, snap_id AS end_snap_id,
        lag(end_interval_time) over (order by dbid, snap_id)
        AS start_snap_time, end_interval_time AS end_snap_time,
       lag(startup_time) over (order by dbid,  snap_id)
       AS start_startup_time, startup_time AS end_startup_time
       FROM sys.wrm$_snapshot) iv
      WHERE iv.start_snap_id IS NOT NULL
      AND iv.start_dbid=iv.end_dbid
      AND iv.start_startup_time=iv.end_startup_time) kv, sys.wrh$_sys_time_model s1, sys.wrh$_sys_time_model s2, v$sysstat n
      where
      kv.inicio = s1.snap_id and
      kv.fin = s2.snap_id and
      n.name = 'DB time' and
      s1.stat_id = s2.stat_id and
      s2.stat_id= n.stat_id and
      kv.hora_inicio > trunc(sysdate - 1)
     order by kv.hora_inicio"
   tracking_column => "snap_b_time"
   use_column_value => true
   type => "d"
   schedule => "* * * * *"
}
}

output {
if [type] == "a" {
elasticsearch {
       hosts => ["***:9200"]
       index => "esb-rn-%{+DDMMYYYY}"}
}
if [type] == "b" {
elasticsearch {
       hosts => ["***:9200"]
       index => "esb-edn-oaoo-%{+DDMMYYYY}"}
}
if [type] == "c" {
elasticsearch {
       hosts => ["***:9200"]
       index => "esb-edn-eqt-%{+DDMMYYYY}"}
}
if [type] == "d" {
elasticsearch {
       hosts => ["***:9200"]
       index => "db-stats-%{+DDMMYYYY}"}

}
}

(Scott Duncan) #3

Bumping to get someone from Elastic to weigh in on this.


(Aaron Mildenstein) #4

Hmmm. I would potentially recommend launching those in individual pipelines. In Logstash 5, that means a different Logstash instance for each jdbc input plugin instance. In Logstash 6, that will mean different entries in the new pipelines.yml file.


(Scott Duncan) #5

Hi Aaron,

Thanks for the suggestion.

As per your recommendation, I created 3 extra instances of logstash-5.5.0, named -1, -2, -3.

In instance -1

input {
jdbc {
   jdbc_connection_string => "jdbc:oracle:thin:@***/***"
   jdbc_user => "***"
   jdbc_password => "***"
   jdbc_validate_connection => true
   jdbc_driver_library => "/opt/logstash-5.5.0/ojdbc6.jar"
   jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
   statement => "select 'AVL' Source_Loc,trunc(record_date, 'HH24') Rec_Date, source_flag, count(*) from ods_avl
where record_date > trunc(sysdate) group by trunc(record_date, 'HH24'), source_flag
union
select 'Unit' Source_Loc,trunc(record_date, 'HH24') Rec_Date, source_flag, count(*) from ods_unit_status
where record_date > trunc(sysdate) group by trunc(record_date, 'HH24'), source_flag
union
select 'Event' Source_Loc,trunc(record_date, 'HH24') Rec_Date, source_flag, count(*) from ods_event
where record_date > trunc(sysdate) group by trunc(record_date, 'HH24'), source_flag
order by 2 desc"
   tracking_column => "rec_date"
   use_column_value => true
   type => "a"
   schedule => "* * * * *"
}
}

output {
if [type] == "a" {
elasticsearch {
       hosts => ["slv-eh-dba10:9200"]
       index => "esb-rn-%{+DDMMYYYY}"}
}
}

In instance -2:

input {
jdbc {
   jdbc_connection_string => "jdbc:oracle:thin:@***/***"
   jdbc_user => "***"
   jdbc_password => "***"
   jdbc_validate_connection => true
   jdbc_driver_library => "/opt/logstash-5.5.0/ojdbc6-1.jar"
   jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
   statement => "select enq_time, q_name, user_Data from wlstest_soainfra.edn_oaoo_delivery_table"
   type => "b"
   schedule => "* * * * *"
}
}
output {
if [type] == "b" {
elasticsearch {
       hosts => ["slv-eh-dba10:9200"]
              index => "esb-edn-oaoo-%{+DDMMYYYY}"}
}
}

After doing and starting it, instance-1 is ok, however instance-2 does the same thing. I've copied the driver in the second instance as well, giving it a second option as before to rule it out as well.

I'm happy to try any other suggestions you might have.

Cheers!


(Aaron Mildenstein) #6

You haven't mentioned whether the other 2 instances are working concurrently with instance 1 or not. Does the problematic instance (seemingly instance 2) work okay when it is the only instance running?


(Scott Duncan) #7

Hi Aaron,

Spot on with your suggestion - instance 2 was causing it to misbehave. It appears that user_data in the instance2 config was a clob, which was causing it to fall over. A good lesson to learn about knowing your data types.

I'll attempt changing it back to a single pipeline and see how it goes, as with this multiple pipeline configuration on a single host, is pushing available memory.

Thanks again for all of your assistance in resolving this.

Cheers!


(system) #8

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.