Logstash - input filter ruby. Persisted vs memory queue: different way to store a blob column from Oracle

Hello everyone.

I have a Logstash pipeline reading from Oracle database a Blob column, named INPUT_MESSAGE.
In the pipepiline.yml I was using this configuration for the pipeline:

queue.type: memory

In the pipeline.config file, my input ruby filter correctly manages the Blob input in this way:

ruby {
init =>"require 'zlib'; require 'stringio';"
code =>"
message = event.get('input_message')
unless message.nil?
if (message.bytes[0] == 1 || message.bytes[0] == 4)
messageContentReader = Zlib::GzipReader.new(StringIO.new(message[1...message.length]))
messageContent = messageContentReader.read
messageContent = message[1...message.length].to_s
event.set('input_message', messageContent)
event.set('input_message_size', messageContent.bytesize)

I've put a Logger Info to track value of the variable "message". It is, for example:

#<Sequel::SQL::Blob:0x7e0 bytes=89 start="\x01\x1F\x8B\b\x00\x00\x00\x00\x00\x04" end="\x01\x004\x88@\xA6Y\x00\x00\x00">

However, due to project requirements, I had to switch to a persisted queue configuration:

queue.type: persisted

In this case, the way of Logstash of storing the column value is different: the value of the variable message comes up to be:


So, in my input filter I receive the following error:
[ERROR][logstash.filters.ruby ] Ruby exception occurred: not in gzip format

Does anyone know how I can convert the message to the expected format, or acting on the way in wich Logstash is storing this field in the queue?

Thank you!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.