Not able to read Kafka - Avro Schema messages

I want to read snappy compressed avro messages stored in kafka into elastic. I am using the Kafka Input Plugin and the Avro Logstash Codec but not able to read the compressed messages. Do note that I am able to read plain non avro schematic messages from Kafka.

Sounds to me like you need a Snappy / Avro codec plugin to decode the data after it has been pulled from the queue.

I am already using a AVRO plugin.

Sorry about that I looked at the Doc site and did not see it listed.

After reading the plugin and looking at the Avro Ruby Library Compression is not implemented yet
Filed in March AvroAVRO-1659 Add Snappy support for Ruby, it has not been closed yet.

https://issues.apache.org/jira/browse/AVRO-1659

hey, thanks a lot!

Ah yes, Snappy is not supported just yet. but please file an issue on the github for the avro codec.
We may be able to explore a Java implementation, thereby replacing the ruby avro library with one that does support Snappy.

I removed the snappy compression to null (no) compression today. Even then the avro codec is not able to read the avsc and decode the file. Do note that we are using the KafkaAvroSerializer to read the data.

The Kafka Avro Serializer is not necessarily only Avro.

From what I see it has some prepended values that are not avro encoded:

Hi,

Any solution to this problem??? We are also facing the same issue. We are trying to send messages from logstash to KAFKA to HDFS.

We are using the following tech stack:

  1. Logstash 2.3 - Current production version
  2. Confluent 3.0.
  3. Plugins:
    a. Logstash-kafka-Output plugin
    b. Logstash-codec-avro.
  4. zookeeper: 3.4.6
  5. KAFKA: 0.10.0.0

Our Logstash config file looks like this:

input {
stdin{}
}

filter {
mutate {
remove_field => ["@timestamp","@version"]
}
}

output {
kafka {
topic_id => 'logstash_logs14'

codec => avro {
schema_uri => "/opt/logstash/bin/schema.avsc"
}
}
}

The schema.avsc file looks like this:

{
"type":"record",
"name":"myrecord",
"fields":[
{"name":"message","type":"string"},
{"name":"host","type":"string"}
]
}

  1. Start Zookeeper in its own terminal
    ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

2 Start Kafka in its own terminal
./bin/kafka-server-start ./etc/kafka/server.properties

3 Start schema registry in its own terminal
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

4 From logstash directory, run the following command
bin/logstash -f ./bin/logstash.conf

5 Type the log message that you wish to send to kafka after running above command
ex: "Hello World"

6 Consume the topic from Kafka
./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic logstash_logs14 --from-beginning

While consuming we get the following error:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Processed a total of 1 messages
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Thanks,
Upendra

What versions are your Logstash Kafka plugins? I think 2.3 only supports
Kafka 0.9.x

Hi Joe,

Thanks for your response.

We have updated logstash to 2.3 but kafka is 0.10 (latest version). But still we are getting same issue.
Should we downgrade Kafka to 0.9.

Yeah you need to run 0.9 for now.

Hi Joe,

We have downgraded kafka to 0.9 but are still facing the same deserialization error (as shown above) while consuming with Kafka-Avro-Consumer. Looks like the Avro-Consumer is not picking up the right schema. Can you help?

Thanks!

There are two problems:

  1. Schema Registry API returns the Avro schema in schema object. You expect that the Avro schema is not inside of nested object. You bypassed this problem with defining schema with a file from local disk (it is not ideal, but it works :).

  2. Every message inside of Confluent Platform should have following structure:
    < magic byte > < schema id (4 bytes) > < Avro blob >
    Currently the codec is not prefixing the Avro blob with magic byte nor schema id. So the kafka-avro-console-consumer expects that inside first 5 bytes there is a magic byte and schema id so it cannot pick up the right schema.

The only solution for this is to write a new codec with prefixing Avro blob. Should not be hard, but I am not a Ruby developer :frowning:

HI Joe & Team,

Any idea of a codec which prefixes Avro blob? Is there anything being worked on?

Thanks,
Upendra

I'm working on a pull request against the logstash-codec-avro that will add support to strip the magic byte and schema id when the strip_header config is set to true, https://github.com/logstash-plugins/logstash-codec-avro/pull/20