Why doesn't this plugin add messages to the queue?

I'm trying to write my first logstash input plugin by following the guide. It uses the mqtt ruby gem to fetch from a mqtt message bus.

The method 'run' is able to fetch from mqtt, and the 'puts' statement shows the message. However the message never makes it to the queue.

queue << message

./bin/logstash -e 'input { stdin{} mqtt{} } output {stdout { codec => rubydebug }}'
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "stud/interval"
require "socket" # for Socket.gethostname
require 'mqtt'


class LogStash::Inputs::Mqtt < LogStash::Inputs::Base
  config_name "mqtt"
  default :codec, "plain"
  config :state_topic, :validate => :string, :default => 'test'

  public
  def register

    @client = MQTT::Client.new
    @client.host = 'test.mosquitto.org'
    @host = Socket.gethostname
  end # def register

  def run(queue)
    @client.connect()
    @client.subscribe( 'test' )
    while !stop?
      @client.get do |topic,message|
        puts "#{topic}: #{message}"
        decorate(message)
        queue << message
      end
      Stud.stoppable_sleep(@interval) { stop? }
    end # loop
    @client.disconnect()
  end # def run

  def stop
    @client.disconnect()
  end
end # class LogStash::Inputs::Mqtt

Here I typed 'herpderp' into stdin, and logstash received and parsed it correctly.
I put the message "testing the message bus" from a separate mqtt broker. The message is received by logstash and printed, but is not parsed by logtash.

herpderp
{
       "message" => "herpderp",
    "@timestamp" => 2016-05-28T16:04:00.338Z,
          "host" => "vagrant-ubuntu-trusty-64",
      "@version" => "1"
}
test: testing the message bus

I've compared my 'run' method with many of the other similar logstash plugins. I dont' see why queue << message isn't working.

stdin
zeromq
lumberjack
tcp

Sharing a working solution. The message needs to be decoded by the codec first.

  def run(output_queue)
    @client.connect()
    @client.subscribe( 'test' )
    while !stop?
      @client.get do |topic,message|
        @codec.decode(message) do |event|
          decorate(event)
          event["host"] = @host if !event.include?("host")
          output_queue << event
        end
      end
      Stud.stoppable_sleep(@interval) { stop? }
    end # loop
    @client.disconnect()
  end # def run