AVRO codec output doesn't accept default values

The flow intended is : file=> logstash=> kafka
the output should be in AVRO format based on a schema file.

	"type": "record",
	"namespace": "be.limero.log",
	"name": "LogEvent",
	"fields": [
			"name": "schemaVersion",
			"type": "long",
			"default": 3
			"type": "long",
			"logicalType": [
			"name": "timestamp"
			"name": "message",
			"type": "string",
			"default": ""
			"name": "host",
			"type": "string",
			"default": ""
			"name": "component",
			"type": "string",
			"default": ""

When I provide an event not containing the field schemaVersion , logstash crashes with an exception : An unexpected error occurred! {:error=>#<Avro::IO::AvroTypeError: The datum {"timestamp"=>1536358093, "message"=>"Started CUPS Scheduler.", "component"=>"systemd", "host"=>"pcpav2"} is not an example of schema {"type":"record","name":"LogEvent",....

The script used below , when the lines on schemaVersion are commented out it works.

#  get a demo log file from Linux system
# example : Sep  8 00:27:44 pcdell dhclient[1150]: DHCPREQUEST of on eno1 to port 67 (xid=0x31943e5b)
# due to the AVRO codec the generated format should be exactly matching the schema or logstash halts
input {
file {
    path => "/var/log/syslog"
    start_position => "beginning"

filter {
# extract line pattern "
grok {
    match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_host} %{DATA:component}(?:\[%{POSINT:arguments}\])?: %{GREEDYDATA:syslog_message}" }
# rename fields to standard and overvwrite
  mutate {
	rename => { 
		"syslog_message" => "message"
		"syslog_host" => "host" 
# parse timestamp to @timestamp
  date {
    match => [ "syslog_timestamp" , "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
# convert to integer for AVRO purpose 
  ruby {
	code => "event.set('timestamp',event.get('@timestamp').to_i)"
# drop garbage that will otherwise confuse AVRO schema
  mutate  {
	remove_field => ["@version","path","syslog_timestamp","@timestamp","arguments"]
#	add_field => { "schemaVersion" => 3 }
  mutate {
#	convert => { "schemaVersion" => "integer" }
output {
  stdout { codec => rubydebug }

    kafka {
        id => "HOME"
        bootstrap_servers  => ""
		topic_id => "topic_avro_syslog"
        codec =>
            avro {
                schema_uri => "avro-simple.json"
               tag_on_failure => true
        value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer'

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.