The flow intended is : file=> logstash=> kafka
the output should be in AVRO format based on a schema file.
{
"type": "record",
"namespace": "be.limero.log",
"name": "LogEvent",
"fields": [
{
"name": "schemaVersion",
"type": "long",
"default": 3
},
{
"type": "long",
"logicalType": [
"timestamp-millis"
],
"name": "timestamp"
},
{
"name": "message",
"type": "string",
"default": ""
},
{
"name": "host",
"type": "string",
"default": ""
},
{
"name": "component",
"type": "string",
"default": ""
}
]
}
When I provide an event not containing the field schemaVersion , logstash crashes with an exception : An unexpected error occurred! {:error=>#<Avro::IO::AvroTypeError: The datum {"timestamp"=>1536358093, "message"=>"Started CUPS Scheduler.", "component"=>"systemd", "host"=>"pcpav2"} is not an example of schema {"type":"record","name":"LogEvent",....
The script used below , when the lines on schemaVersion are commented out it works.
#
# get a demo log file from Linux system
# example : Sep 8 00:27:44 pcdell dhclient[1150]: DHCPREQUEST of 192.168.0.163 on eno1 to 192.168.0.1 port 67 (xid=0x31943e5b)
#
# due to the AVRO codec the generated format should be exactly matching the schema or logstash halts
#
input {
file {
path => "/var/log/syslog"
start_position => "beginning"
}
}
filter {
#
# extract line pattern "
#
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_host} %{DATA:component}(?:\[%{POSINT:arguments}\])?: %{GREEDYDATA:syslog_message}" }
}
#
# rename fields to standard and overvwrite
#
mutate {
rename => {
"syslog_message" => "message"
"syslog_host" => "host"
}
}
#
# parse timestamp to @timestamp
#
date {
match => [ "syslog_timestamp" , "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
#
# convert to integer for AVRO purpose
#
ruby {
code => "event.set('timestamp',event.get('@timestamp').to_i)"
}
#
# drop garbage that will otherwise confuse AVRO schema
#
mutate {
remove_field => ["@version","path","syslog_timestamp","@timestamp","arguments"]
# add_field => { "schemaVersion" => 3 }
}
mutate {
# convert => { "schemaVersion" => "integer" }
}
}
output {
stdout { codec => rubydebug }
kafka {
id => "HOME"
bootstrap_servers => "192.168.0.163:9092"
topic_id => "topic_avro_syslog"
codec =>
avro {
schema_uri => "avro-simple.json"
tag_on_failure => true
}
value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer'
}
}