Not able to process binary data (coming from kafka) in logstash custom ruby filter

Hi,

I have created a logstash custom ruby filter to decode data coming from kafka queue.

Below is my code snippet of ruby. Here I am calling java method "@decoderObj.decodeData" to decode the data.

  public
  def register
    @decoderObj = Java::DecoderClass.new
  end # def register

  public
  def filter(event)

      event.set("message", @decoderOb.decodeData(event))
      @logger.debug? && @logger.debug("Message is now: #{event.get("message")}")
    filter_matched(event)
  end # def filter

And here is my java method 'decodeData' of class 'DecoderClass'

   public String decodeData(org.logstash.ext.JrubyEventExtLibrary.RubyEvent re)
   {
	 try {
	    	  String sourceString = (String) re.getEvent().getField("message");
	    	  byte[] source = sourceString.getBytes();

I could not get raw data as is coming from kafka as byte array. I am getting String field when I am trying to get message from event object.

And when I convert message string again to byte array. my data got currupted. How can I get byte[] as is coming from kafka.

Can anyone help me on this ??

Input/output configuration used:

bin/logstash -e 'input { kafka { bootstrap_servers => "kafka:9092" topics => ["test-topic"] } } filter { example {} } output {stdout { codec => rubydebug }}'

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.