Hi,
I am reading a JSON input file and feeds it to a an Json-to-Avro codec converter before sending it to a Kafka topic.
the codec takes the schema, writes to the schema registry and obtains a schema id. It then writes the schema id and encodes the record data. The entire output is then sent to the Kafka topic.
This is the logstash configuration file:
input {
file {
path => '/tmp/logstash_input'
codec => 'json'
}
}
output {
kafka {
topic_id => 'test_topic'
codec => avro_schema_registry {
endpoint => 'http://localhost:8081'
}
}
}
I write the following to the input file:
{"value_schema":"{\"type\":\"record\",\"name\":\"Event\",\"fields\":[{\"name\":\"request_id\",\"type\":\"string\"},{\"name\":\"activity_id\",\"type\":\"int\"},{\"name\":\"counter\",\"type\":\"int\"},]}","record":{"request_id":"request_1","activity_id":34,"counter":1}}
I can see the encoding function gets executed and the following schema id is logged: 181
However, when I consume the Avro message off the topic with the following command:
./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic test_topic
Kafka complains that it is attempting to lookup a schema from a different schema id...
Can anyone tell if the problem is in the codec code or in the way Kafka is configured?
Below is the Avro codec that I wrote:
encoding: utf-8
require "avro"
require "schema_registry"
require "schema_registry/client"
require "logstash/codecs/base"
require "logstash/event"
require "logstash/timestamp"
require "logstash/util"
require "json"
MAGIC_BYTE = 0
class LogStash::Codecs::AvroSchemaRegistry < LogStash::Codecs::Base
config_name "avro_schema_registry"
# schema registry endpoint and credentials
config :endpoint, :validate => :string, :required => true
config :username, :validate => :string, :default => nil
config :password, :validate => :string, :default => nil
public
def register
@client = SchemaRegistry::Client.new(endpoint, username, password)
@schemas = Hash.new
end
def get_schema(schema_id)
if !@schemas.has_key?(schema_id)
@schemas[schema_id] = Avro::Schema.parse(@client.schema(schema_id))
end
@schemas[schema_id]
end
def add_schema(schema_json)
subject = @client.subject('test.schema_registry')
subject.register_schema(schema_json)
end
public
def decode(data)
if data.length < 5
@logger.error('message is too small to decode')
else
datum = StringIO.new(data)
magic_byte, schema_id = datum.read(5).unpack("cI>")
if magic_byte != MAGIC_BYTE
@logger.error('message does not start with magic byte')
else
schema = get_schema(schema_id)
decoder = Avro::IO::BinaryDecoder.new(datum)
datum_reader = Avro::IO::DatumReader.new(schema)
yield LogStash::Event.new(datum_reader.read(decoder))
end
end
end
public
def encode(event)
# extract the schema from the LogStash event...
schema_json = event["value_schema"]
# Use the schema to obtain a schema_id from the schema registry
schema_id = add_schema(schema_json)
@logger.info("schema id: #{schema_id}")
# Start with the magic byte + schema id
headers = [MAGIC_BYTE,schema_id]
header = headers.pack("cI>")
record = event["record"]
# start by writing the header
buffer = StringIO.new
buffer.write(header)
# Prepare the encoder that will write to the buffer
encoder = Avro::IO::BinaryEncoder.new(buffer)
schema = Avro::Schema.parse(schema_json)
dataWriter = Avro::IO::DatumWriter.new(schema)
# write the actual reocrd using the encoder
dataWriter.write(record, encoder)
@on_event.call(event, buffer.string)
end
end