AVRO codec output doesn't accept default values

The flow intended is : file=> logstash=> kafka
the output should be in AVRO format based on a schema file.

{
	"type": "record",
	"namespace": "be.limero.log",
	"name": "LogEvent",
	"fields": [
		{
			"name": "schemaVersion",
			"type": "long",
			"default": 3
		},
		{
			"type": "long",
			"logicalType": [
				"timestamp-millis"
			],
			"name": "timestamp"
		},
		{
			"name": "message",
			"type": "string",
			"default": ""
		},
		{
			"name": "host",
			"type": "string",
			"default": ""
		},
		{
			"name": "component",
			"type": "string",
			"default": ""
		}
	]
}

When I provide an event not containing the field schemaVersion , logstash crashes with an exception : An unexpected error occurred! {:error=>#<Avro::IO::AvroTypeError: The datum {"timestamp"=>1536358093, "message"=>"Started CUPS Scheduler.", "component"=>"systemd", "host"=>"pcpav2"} is not an example of schema {"type":"record","name":"LogEvent",....

The script used below , when the lines on schemaVersion are commented out it works.

# 
#  get a demo log file from Linux system
# example : Sep  8 00:27:44 pcdell dhclient[1150]: DHCPREQUEST of 192.168.0.163 on eno1 to 192.168.0.1 port 67 (xid=0x31943e5b)
#
# due to the AVRO codec the generated format should be exactly matching the schema or logstash halts
# 
input {
file {
    path => "/var/log/syslog"
    start_position => "beginning"
  }
}

filter {
#
# extract line pattern "
#
grok {
    match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_host} %{DATA:component}(?:\[%{POSINT:arguments}\])?: %{GREEDYDATA:syslog_message}" }
  }
#
# rename fields to standard and overvwrite
#
  mutate {
	rename => { 
		"syslog_message" => "message"
		"syslog_host" => "host" 
		 }
  }
#
# parse timestamp to @timestamp
#
  date {
    match => [ "syslog_timestamp" , "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
  }
#
# convert to integer for AVRO purpose 
# 
  ruby {
	code => "event.set('timestamp',event.get('@timestamp').to_i)"
	}
#
# drop garbage that will otherwise confuse AVRO schema
# 
  mutate  {
	remove_field => ["@version","path","syslog_timestamp","@timestamp","arguments"]
#	add_field => { "schemaVersion" => 3 }
	}
  mutate {
#	convert => { "schemaVersion" => "integer" }
  }
}
output {
  stdout { codec => rubydebug }

    kafka {
        id => "HOME"
        bootstrap_servers  => "192.168.0.163:9092"
		topic_id => "topic_avro_syslog"
        codec =>
            avro {
                schema_uri => "avro-simple.json"
               tag_on_failure => true
            }
        value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer'
    }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.