Is there a way to set up Kafka message format in AVRO instead of JSON?


#1

I set up Filebeat send outputs to Kafka.
I would like the kafka messages are in Avro format instead of JSON format.
I am wondering what is the easiest way to achieve this if possible?

thanks!
yan


(Steffen Siering) #2

AVRO is not supported by beats. With each message having to potentially store the schema with the message, AVRO would be of limited use though.


#3

thanks for the clarification!
hmm, it is true that "With each message having to potentially store the schema with the message", it is rarely the case in practice, especially in the case of shipping out log messages in filebeat, all my log messages follows single schema.
In this case, AVRO would be more efficient than JSON, since every JSON message needs carry the all field names?

or maybe I mis-interpret how transfer AVRO works in kafka? does kafka allows pre-register schema, or it needs to carry schema in each kafka message?


(Steffen Siering) #4

Kafka does not care about the encoding of it's events. An event is just some binary data. Plus, kafka is not a pure single-queue, but acts more like a distributed append only storage. With distributed I mean, even a topic you publish too is partitioned, with every partition being an append only storage/queue itself. You can start/add an alternative consumer-group at any time. This might require your consumer to support different schema versions to support historical and new data at the same time (depends how long data are held in kafka and which offset you start reading).

Beats output is not filebeat only, but must serve all beats use cases. But even filebeat has no set schema + even the meta-data we add might change in the future. These might require changes to the schema. Some already available features changing the actual events schema: add docker metadata (including labels), add kubernetes meta-data, custom fields/tags, drop_fields, include_fields... That is, we would have to dynamically determine the actual schema in the output. With encodings like JSON, the schema is implicit and dynamic -> problem solved :wink: .

The problems one has to solve with beats are: dynamically generating/updating the schema (a schema might be very dynamic and updated every so often) , schema versioning and schema exchange. We might not know all fields on start + docker labels and other external meta-data can add new fields to an existing schema at any time -> at worst one might require a different schema by machine/beat instance in the cluster.

With encodings requiring schemas being explicitly defined, you have to define a way how the schemas are exchanged. One can define some kind of in-band or out-of-band schema exchange mechanism/protocol, but tbh none fit kafka very well in a most generic way. Having some kind of schema exchange in TCP for example is easy, as the connection and data-exchange is short-lived (even if it's active for days) in comparison to kafka. You can think of the schema as some kind of state the producer and consumer need to agree on at any time.

In-band schema exchange would require the producer to send the schema update to every partition + requires correct ordering of events on retry. Plus, as consumers are allowed to be started/added/restarted at any time you will need to store the schema at the consumers place. You also need to initialise the schema in the consumer to the most recent correct version, once you start the consumer the first time. Depending which offset you start the consumer at, the meaning of correct version might differ. This is due to the consumer potentially starting anywhere in the middle of data being pushed via kafka (as kafka deletes old segments based on it's retention policies). I'd say the in-band schema exchange is no good model for Kafka.

Some out-of band protocol might require you to store the most recent schema in another topic or service. If the consumer finds the schema doesn't fit, the consumer needs to fetch the matching schema. Avro messages can include a 8byte hash of the used schema, to detect usage of a different schema. With producers pushing async, there is a chance a consumer seeing a message with updated schema, before actually being able to see the schema update (just fetch and wait for schema update to be available). This functionality is actually provided by the Confluent Schema Registry. The schema registry is another service (HTTP REST interface) provided by Confluent one needs to integrate with. Confluent also provides some java libraries, so you don't have to deal with all the actual logic required to get this working correctly. But it's a service one needs to run next to Kafka and Zookeeper and producers/consumer need to integrate with. Just hope for your schema exchange server not failing :wink: . Don't get me wrong, it's actually a good/viable solution regarding the problem space. But personally I'd prefer not to have to integrate with yet another service.

I've been looking at binary encodings, e.g. for transfer and local storage in beats (think disk based queue). Currently JSON is the common denominator in the Elastic Stack. An event in beats must be self-contained. E.g. in case the codec gets changed when updating the output, events might need to be transcoded (the internal structure must be 'streamable'). Plus, JSON limits the actual types an encoding must/can support. From my point of view different kind of binary json formats (well, in a number of cases subsets) and a subset of CBOR did so far meet beats requirements (e.g. msgpack is no option due to the format requiring us to know some object sizes in advance).

But having to integrate some schema exchange via another service is another story and nothing we are looking for in the near future.


(system) #5

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.