Hi,
I have created a logstash custom ruby filter to decode data coming from kafka queue.
Below is my code snippet of ruby. Here I am calling java method "@decoderObj.decodeData" to decode the data.
public
def register
@decoderObj = Java::DecoderClass.new
end # def register
public
def filter(event)
event.set("message", @decoderOb.decodeData(event))
@logger.debug? && @logger.debug("Message is now: #{event.get("message")}")
filter_matched(event)
end # def filter
And here is my java method 'decodeData' of class 'DecoderClass'
public String decodeData(org.logstash.ext.JrubyEventExtLibrary.RubyEvent re)
{
try {
String sourceString = (String) re.getEvent().getField("message");
byte[] source = sourceString.getBytes();
I could not get raw data as is coming from kafka as byte array. I am getting String field when I am trying to get message from event object.
And when I convert message string again to byte array. my data got currupted. How can I get byte[]
as is coming from kafka.
Can anyone help me on this ??
Input/output configuration used:
bin/logstash -e 'input { kafka { bootstrap_servers => "kafka:9092" topics => ["test-topic"] } } filter { example {} } output {stdout { codec => rubydebug }}'