Processing Multiple Rows From JDBC Streaming

I'm querying a database to get questions asked in a ticket. The query asks for all questions asked by the ticket id. So a single query may return multiple rows. I'd like to be able to parse them all into separate fields for the record. So the query and results would be something like:

Select question,answer,ticketID from tickets

ticketID question answer
2114 2 tonight
2114 4 tomorrow
2114 0 never

Then they'd look something like below in Elasticsearch

field value
question.0 never
question.2 tonight
question.4 tomorrow

I would assume the JDBC Stream filter would take the results and put them in an array. Would I just use mutate to split them out into individual fields?

Went ahead and ran the query and some good and bad things happened. First, the results were stuck in an object, I believe this to be good and can use the json filter to parse it.

{
  "integeranswer": null,
  "questionid": 2,
  "dateanswer": null,
  "textanswer": null,
  "decimalanswer": null,
  "booleananswer": true
},
{
  "integeranswer": null,
  "questionid": 4,
  "dateanswer": null,
  "textanswer": null,
  "decimalanswer": null,
  "booleananswer": false
}

However, when I configured the json filter, only specifying the target, I get an error and the pipeline shuts down. Any ideas what this error means?

[logstash.javapipeline    ] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"example", :error=>"Missing Converter handling for full class name=org.jruby.ext.date.RubyDateTime, simple name=RubyDateTime", :exception=>Java::OrgLogstash::MissingConverterException, :backtrace=>["org.logstash.Valuefier.fallbackConvert(Valuefier.java:118)", "org.logstash.Valuefier.convert(Valuefier.java:96)", "org.logstash.ConvertedMap$1.visit(ConvertedMap.java:55)", "org.logstash.ConvertedMap$1.visit(ConvertedMap.java:49)", "org.jruby.RubyHash.visitLimited(RubyHash.java:698)", "org.jruby.RubyHash.visitAll(RubyHash.java:683)", "org.logstash.ConvertedMap.newFromRubyHash(ConvertedMap.java:89)", "org.logstash.ConvertedMap.newFromRubyHash(ConvertedMap.java:84)", "org.logstash.Valuefier.lambda$initConverters$12(Valuefier.java:171)", "org.logstash.Valuefier.convert(Valuefier.java:94)", "org.logstash.ConvertedList.newFromRubyArray(ConvertedList.java:64)", "org.logstash.Valuefier.lambda$initConverters$16(Valuefier.java:183)", "org.logstash.Valuefier.convert(Valuefier.java:94)", "org.logstash.ext.JrubyEventExtLibrary$RubyEvent.safeValueifierConvert(JrubyEventExtLibrary.java:355)", "org.logstash.ext.JrubyEventExtLibrary$RubyEvent.ruby_set_field(JrubyEventExtLibrary.java:121)", "D_3a_.Logstash.$7_dot_13_dot_3.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_0_dot_7.lib.logstash.filters.jdbc_streaming.RUBY$method$process_event$0(D:/Logstash/7.13.3/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.7/lib/logstash/filters/jdbc_streaming.rb:163)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:119)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:175)", "org.jruby.ir.targets.InvokeSite.fail(InvokeSite.java:290)", "D_3a_.Logstash.$7_dot_13_dot_3.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_0_dot_7.lib.logstash.filters.jdbc_streaming.RUBY$method$filter$0(D:/Logstash/7.13.3/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.7/lib/logstash/filters/jdbc_streaming.rb:136)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:106)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:140)", "org.jruby.ir.targets.InvokeSite.fail(InvokeSite.java:267)", "D_3a_.Logstash.$7_dot_13_dot_3.logstash_minus_core.lib.logstash.filters.base.RUBY$method$do_filter$0(D:/Logstash/7.13.3/logstash-core/lib/logstash/filters/base.rb:159)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:106)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:140)", "org.jruby.ir.targets.InvokeSite.fail(InvokeSite.java:267)", "D_3a_.Logstash.$7_dot_13_dot_3.logstash_minus_core.lib.logstash.filters.base.RUBY$block$multi_filter$1(D:/Logstash/7.13.3/logstash-core/lib/logstash/filters/base.rb:178)", "org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBody.java:148)", "org.jruby.runtime.BlockBody.yield(BlockBody.java:106)", "org.jruby.runtime.Block.yield(Block.java:184)", "org.jruby.RubyArray.each(RubyArray.java:1809)", "D_3a_.Logstash.$7_dot_13_dot_3.logstash_minus_core.lib.logstash.filters.base.RUBY$method$multi_filter$0(D:/Logstash/7.13.3/logstash-core/lib/logstash/filters/base.rb:175)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:106)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:140)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:200)", "org.logstash.config.ir.compiler.FilterDelegatorExt.doMultiFilter(FilterDelegatorExt.java:127)", "org.logstash.config.ir.compiler.AbstractFilterDelegatorExt.multiFilter(AbstractFilterDelegatorExt.java:134)", "org.logstash.generated.CompiledDataset9.compute(Unknown Source)", "org.logstash.generated.CompiledDataset4.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset6.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset6.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset5.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset4.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset6.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset6.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset6.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset6.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset6.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset6.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset6.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset6.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset6.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset6.compute(Unknown Source)", "org.logstash.generated.CompiledDataset2.compute(Unknown Source)", "org.logstash.generated.CompiledDataset5.compute(Unknown Source)", "org.logstash.config.ir.CompiledPipeline$CompiledUnorderedExecution.compute(CompiledPipeline.java:329)", "org.logstash.config.ir.CompiledPipeline$CompiledUnorderedExecution.compute(CompiledPipeline.java:323)", "org.logstash.execution.WorkerLoop.run(WorkerLoop.java:87)", "jdk.internal.reflect.GeneratedMethodAccessor131.invoke(Unknown Source)", "java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)", "java.base/java.lang.reflect.Method.invoke(Method.java:566)", "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:441)", "org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:305)", "org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:32)", "D_3a_.Logstash.$7_dot_13_dot_3.logstash_minus_core.lib.logstash.java_pipeline.RUBY$block$start_workers$5(D:/Logstash/7.13.3/logstash-core/lib/logstash/java_pipeline.rb:295)", "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52)", "org.jruby.runtime.Block.call(Block.java:139)", "org.jruby.RubyProc.call(RubyProc.java:318)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105)", "java.base/java.lang.Thread.run(Thread.java:834)"], :thread=>"#<Thread:0x3d043597 sleep>"}

A jdbc_streaming filter returns an array of hashes (it has to be an array because the result set may have multiple rows, and they have to be hashes because you can fetch multiple columns). You do not need to parse this as JSON.

You can use a split filter to separate the array into multiple events.

I configured it to use Split...but it looks like that's going to produce unfavorable results. The test record I've done it on only have one question, but it appears as though it will end up creating an array. Is it possible to set the target using split to something like target => "[question][%{questionid}]"? The intent would be to stick all the fields in a separate object under each question id...so like:

questionid2.integeranswer
questionid2.dateanswer
questionid2.textanswer
questionid2.decimalanswer
questionid2.booleananswer
questionid4.integeranswer
questionid4.dateanswer
questionid4.textanswer
questionid4.decimalanswer
questionid4.booleananswer

No, but you can reformat the array of hashes and change

 [ { "questionid": 2, "integeranswer": null, ... },
{ "questionid": 4, "integeranswer": null, ... } ]

into

[ { "questionid2": { "integeranswer": null, ... } },
{ "questionid4": {"integeranswer": null, ... } } ]

using a ruby filter.

ruby {
    code => '
        data = event.get("jdbcOutput")
        if data.is_a? Array
            newData = []
            data.each { |h|
                q = h.delete("questionid")
                newData << { "question#{q}" => h }
            }
            event.set("jdbcOutput", newData)
        end
    '
}

then split that.

At least I was researching the right path, using ruby to manipulate the document. This is well and above anything I remotely understand with Ruby. Was I supposed to modify it, like the event.get from jdbcOutput to my actual field name, or just copy/paste? I copy/pasted it and nothing happens....no errors, no change in field structures.

You would need to update the event.get and .set calls to use the name of the field that your jdbc_streaming filter fetched data into.

Alright, that does the bulk of the work thank you. However, when splitting the fields out, they're all under question so the fields look like question.question2.integeranswer Is there a modification to this Ruby code to make it just question2.integeranswer or do I need to throw in an additional ruby script after this to modify them all?

Perhaps

        data.each { |h|
            q = h.delete("questionid")
            event.set("question#{q}", h)
        }
    end

would work better for you, and maybe change the event.get to event.remove

Or if you need the array so that you can do the split then move the data using a ruby filter like this one.

That did it, I appreciate the assistance. I've said it before, I need to find time to learn ruby so I can do this kind of stuff myself...or at the very least understand what the hell is going on here.

Follow up question, can this ruby script be used to also remove the field if the value is null? I've tried using an additional ruby script I've used previously to remove empty fields, but it doesn't work on these fields, I'm assuming because they're nested.

      ruby {
        code => '
            event.to_hash.each { |k, v|
                if v.kind_of? String
                    if v == ""
                        event.remove(k)
                    end
                end
            }
        '
      }

That is correct. That ruby filter only checks the top-level fields. This is an example of ruby code that recursively descends into fields of an event and modifies them.

I'm very much lost in understanding most of this, but I THINK I changed it to remove null fields? Although, I think I'm missing something because I don't see where a target field is referencing...is it looking at the entire event or just the message field?
I'm not quite sure how to interpret what it's doing after the first couple logic checks. Define variable EmptyField, then if the object exists, and the object is a hash and not empty, then for each hash in the object do....something...

ruby {
        code => '
            def EmptyField(object, name, event)
                if object
                    if object.kind_of?(Hash) and object != {}
                        object.each { |k, v| EmptyField(v, "#{name}[#{k}]", event) }
                    elsif object.kind_of?(Array) and object != []
                        object.each_index { |i|
                            EmptyField(object[i], "#{name}[#{i}]", event)
                        }
                    else
                        lastElement = name.gsub(/^.*\[/, "").gsub(/\]$/, "")
                        if lastElement.length = "null"
                            event.remove(name)
                        end
                    end
                end
            end

            event.to_hash.each { |k, v|
                EmptyField(v, "[#{k}]", event)
            }
        '
    }

No...this did not work...appears to remove everything in the event, lol.

That is never going to be true, so yes, it will delete everything. I think you can replace

                    lastElement = name.gsub(/^.*\[/, "").gsub(/\]$/, "")
                    if lastElement.length = "null"

with

                    if object = "null"

I did catch that .length and removed it before testing it out. I made the change you suggested, but it still removes the whole event.

      ruby {
              code => '
                  def EmptyField(object, name, event)
                      if object
                          if object.kind_of?(Hash) and object != {}
                              object.each { |k, v| EmptyField(v, "#{name}[#{k}]", event) }
                          elsif object.kind_of?(Array) and object != []
                              object.each_index { |i|
                                  EmptyField(object[i], "#{name}[#{i}]", event)
                              }
                          else
                              if object = "null"
                                  event.remove(name)
                              end
                          end
                      end
                  end
      
                  event.to_hash.each { |k, v|
                      EmptyField(v, "[#{k}]", event)
                  }
              '
          }

I'm running this AFTER the first ruby script that restructures the object to below. Should it be running first?

"question": {
  "question2": {
    "textanswer": null
  },
  "question4": {
    "textanswer": null
  }
}

Sorry, that is an assignment so it is unconditionally true. Try if object == "null". Note that null in JSON will be nil in Ruby, so you may need if ! object instead.

I would run it after the restructuring.

Tried if object == "null", if object == "nil", and if ! object and none of them worked. They didn't delete the event, but they didn't seem to do anything either.

What does an event look like if you use

output { stdout { codec => rubydebug } }

(Doesn't have to be stdout, you could use a file output if it is more convenient.)

{
	"question39": {
		"dec": null,
		"question": "Sup Brah?",
		"bool": null,
		"date": null,
		"text": null,
		"int": null
	}
}