I'm trying to write my first logstash input plugin by following the guide. It uses the mqtt ruby gem to fetch from a mqtt message bus.
The method 'run' is able to fetch from mqtt, and the 'puts' statement shows the message. However the message never makes it to the queue.
queue << message
./bin/logstash -e 'input { stdin{} mqtt{} } output {stdout { codec => rubydebug }}'
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "stud/interval"
require "socket" # for Socket.gethostname
require 'mqtt'
class LogStash::Inputs::Mqtt < LogStash::Inputs::Base
config_name "mqtt"
default :codec, "plain"
config :state_topic, :validate => :string, :default => 'test'
public
def register
@client = MQTT::Client.new
@client.host = 'test.mosquitto.org'
@host = Socket.gethostname
end # def register
def run(queue)
@client.connect()
@client.subscribe( 'test' )
while !stop?
@client.get do |topic,message|
puts "#{topic}: #{message}"
decorate(message)
queue << message
end
Stud.stoppable_sleep(@interval) { stop? }
end # loop
@client.disconnect()
end # def run
def stop
@client.disconnect()
end
end # class LogStash::Inputs::Mqtt
Here I typed 'herpderp' into stdin, and logstash received and parsed it correctly.
I put the message "testing the message bus" from a separate mqtt broker. The message is received by logstash and printed, but is not parsed by logtash.
herpderp
{
"message" => "herpderp",
"@timestamp" => 2016-05-28T16:04:00.338Z,
"host" => "vagrant-ubuntu-trusty-64",
"@version" => "1"
}
test: testing the message bus
I've compared my 'run' method with many of the other similar logstash plugins. I dont' see why queue << message
isn't working.