I am trying to determine why the last part of my outputs is failing (when not remarked out). Here is my configuration. Would someone help me figure out why it's failing? It's saying I am missing
output {
if [type] == "dlq" {
kafka {
message_key => "%{userId}"
topic_id => "core.device.events.dlq"
compression_type => "snappy"
codec => "json"
}
}
# Handle Device Responses
else if [type] == "device-response" {
kafka {
message_key => "%{userId}"
topic_id => "core.device.commands.%{operation}.response"
compression_type => "snappy"
codec => "json"
}
}
# ALL Messages go to events.all
else {
kafka {
message_key => "%{userId}"
topic_id => "core.device.events.all"
compression_type => "snappy"
codec => "json"
}
# Create per-deviceType topics - Need to create the topics in Kafka when adding a new deviceType!
kafka {
message_key => "%{userId}"
topic_id => "core.device.events.%{deviceType}"
compression_type => "snappy"
codec => "json"
` }
# Create per-operation topics - Need to create the topics in Kafka when adding a new operation!
# if [operation] == "alarm" or [operation] == "remove-staged-software" or [operation] == "file-upload" {
# {
# message_key => "%{userId}"
# topic_id => "core.device.event.%{operation}"
# compression_type => "snappy"
# codec => "json"
# }
# }
}
Well that was a typo from cutting and pasting into this wonderful UI when I was trying to get it to blockquote. The error is
"[FATAL] 2019-01-31 19:31:02.427 [LogStash::Runner] runner - The given configuration is invalid. Reason: Expected one of #, and, or, xor, nand, { at line 96, column 40 (byte 2522) after output {"
Here is the failing config:
input {
stomp {
destination => "consumer.kafka-core.virtual.topic.out.events.all"
codec => "json"
host => "mq"
type => "device-events"
}
stomp {
destination => "consumer.kafka-core.virtual.topic.out.commands.all"
codec => "json"
host => "mq"
type => "commands"
}
stomp {
destination => "consumer.kafka-core.virtual.topic.out.DLQ.all"
codec => "json"
host => "mq"
type => "dlq"
}
stomp {
destination => "consumer.kafka-core.virtual.topic.out.response.all"
codec => "json"
host => "mq"
type => "device-response"
}
}
# Filters
filter {
# For incoming events from devices
if [type] == "device-events" {
mutate {
replace => { "type" => "event-%{deviceType}-%{operation}" }
}
}
# Rules for outgoing commands sent to devices
if [type] == "commands" {
mutate {
remove_field => [ "[payload][cert]" ]
remove_field => [ "[payload][pwd]" ]
}
mutate {
replace => { "type" => "command-%{deviceType}-%{operation}" }
}
}
# Rules for ALL messages
mutate {
lowercase => [ "type" ]
}
date {
locale => en
match => [ "sentTimestamp" , "dd/MMM/yyyy:HH:mm:ss Z" , "yyyy-MM-dd HH:mm:ss,SSS" , "yyy-MM-dd HH:mm:ss,SSSZ" , "ISO8601" ]
}
}
# Outputs
output {
# Handle Dead Letter Messages
if [type] == "dlq" {
kafka {
message_key => "%{siteId}"
topic_id => "core.device.events.dlq"
compression_type => "snappy"
codec => "json"
}
}
# Handle Device Responses
else if [type] == "device-response" {
kafka {
message_key => "%{siteId}"
topic_id => "core.device.commands.%{operation}.response"
compression_type => "snappy"
codec => "json"
}
}
# ALL Messages go to events.all
kafka {
message_key => "%{siteId}"
topic_id => "core.device.events.all"
compression_type => "snappy"
codec => "json"
}
# Create per-deviceType topics - Need to create the topics in Kafka when adding a new deviceType!
kafka {
message_key => "%{siteId}"
topic_id => "core.device.events.%{deviceType}"
compression_type => "snappy"
codec => "json"
}
# Create per-operation topics - Need to create the topics in Kafka when adding a new operation!
if [operation] == "alarm-event" OR [operation] == "remove-staged-software-event" OR [operation] == "file-upload-event" {
kafka {
message_key => "%{siteId}"
topic_id => "core.device.event.%{operation}"
compression_type => "snappy"
codec => "json"
}
}
}
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.