Kakfa is reading off the wrong schema id

Hi,

I am reading a JSON input file and feeds it to a an Json-to-Avro codec converter before sending it to a Kafka topic.
the codec takes the schema, writes to the schema registry and obtains a schema id. It then writes the schema id and encodes the record data. The entire output is then sent to the Kafka topic.

This is the logstash configuration file:

input {
  file {
    path => '/tmp/logstash_input'
    codec => 'json'
  }
}
output {
  kafka {
     topic_id => 'test_topic'
     codec => avro_schema_registry {
           endpoint => 'http://localhost:8081'
     }
  }
}    

I write the following to the input file:

{"value_schema":"{\"type\":\"record\",\"name\":\"Event\",\"fields\":[{\"name\":\"request_id\",\"type\":\"string\"},{\"name\":\"activity_id\",\"type\":\"int\"},{\"name\":\"counter\",\"type\":\"int\"},]}","record":{"request_id":"request_1","activity_id":34,"counter":1}}

I can see the encoding function gets executed and the following schema id is logged: 181
However, when I consume the Avro message off the topic with the following command:

./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic test_topic

Kafka complains that it is attempting to lookup a schema from a different schema id...

Can anyone tell if the problem is in the codec code or in the way Kafka is configured?

Below is the Avro codec that I wrote:

encoding: utf-8
require "avro"
require "schema_registry"
require "schema_registry/client"
require "logstash/codecs/base"
require "logstash/event"
require "logstash/timestamp"
require "logstash/util"
require "json"

MAGIC_BYTE = 0

class LogStash::Codecs::AvroSchemaRegistry < LogStash::Codecs::Base
  config_name "avro_schema_registry"

  # schema registry endpoint and credentials
  config :endpoint, :validate => :string, :required => true
  config :username, :validate => :string, :default => nil
  config :password, :validate => :string, :default => nil

  public
  def register
    @client = SchemaRegistry::Client.new(endpoint, username, password)
    @schemas = Hash.new
  end

  def get_schema(schema_id)
    if !@schemas.has_key?(schema_id)
      @schemas[schema_id] = Avro::Schema.parse(@client.schema(schema_id))
    end
    @schemas[schema_id]
  end

  def add_schema(schema_json)
    subject = @client.subject('test.schema_registry')
    subject.register_schema(schema_json)
  end

  public
  def decode(data)

    if data.length < 5
      @logger.error('message is too small to decode')
    else
      datum = StringIO.new(data)
      magic_byte, schema_id = datum.read(5).unpack("cI>")
      if magic_byte != MAGIC_BYTE
        @logger.error('message does not start with magic byte')
      else
        schema = get_schema(schema_id)
        decoder = Avro::IO::BinaryDecoder.new(datum)
        datum_reader = Avro::IO::DatumReader.new(schema)
        yield LogStash::Event.new(datum_reader.read(decoder))
      end
    end
  end

  public
  def encode(event)

        # extract the schema from the LogStash event...
    schema_json = event["value_schema"]

        # Use the schema to obtain a schema_id from the schema registry
 
    schema_id = add_schema(schema_json)
    @logger.info("schema id: #{schema_id}")

        # Start with the magic byte + schema id
    headers = [MAGIC_BYTE,schema_id]
    header = headers.pack("cI>")

    record = event["record"]

        # start by writing the header
    buffer = StringIO.new
    buffer.write(header)

        # Prepare the encoder that will write to the buffer
    encoder = Avro::IO::BinaryEncoder.new(buffer)

    schema = Avro::Schema.parse(schema_json)
    dataWriter = Avro::IO::DatumWriter.new(schema)    
    
        # write the actual reocrd using the encoder
    dataWriter.write(record, encoder)

    @on_event.call(event, buffer.string)
  end
end