SQS input plugin only pulls one JSON object from SQS Payload

When using the SQS input plugin from the 2.0 branch, and (some of the) 1.5 branch. Payloads with multiple json objects only return a single json object. When using the json codec, it returns the last json object in the payload. When using the json_lines codec, it returns the first object in the payload.

Here's a simplified config.

input {
  sqs {
    queue => "some_sqs_queue"
    #codec => "json_lines"
    codec => "json"
  }
}

output {
  stdout { 
    codec => rubydebug
  }
}

and a sample payload

{"obj1": 1}
{"obj2": 2}
{"obj3": 3}

Tested:

  • 5.0.0-apha2 - behavior described above
  • 2.3.2 - behavior described above
  • 2.2.4 - behavior described above
  • 2.1.1 - behavior described above
  • 2.0.0 - behavior described above
  • 1.5.5 - behavior described above (need to manually fix https://github.com/logstash-plugins/logstash-input-sqs/pull/23)
  • 1.5.4 - json_lines behaves as expected if {"obj3": 3} ends in a new line (no new line and the obj1 is dropped)
  • 1.5.3 - json_lines behaves as expected if {"obj3": 3} ends in a new line (no new line and the obj3 is dropped)

I don't know the logstash codebase that well, but looking at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-sqs-2.0.5/lib/logstash/inputs/sqs.rb I see this chunk of code

 def decode_event(message)
    @logger.info(message.body)
    @codec.decode(message.body) do |event|
      @logger.info(event.to_json)
      return event
    end
  end

I think that return should be an enqueue (or something like that), or wouldn't it always take one item, return and dump the rest on the ground?

1 Like

I hacked on /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-sqs-2.0.5/lib/logstash/inputs/sqs.rb and collapsed some of the nice small functions into something that copes with the json_lines codec. Off to write an issue.

def run(output_queue)
  @logger.debug("Polling SQS queue", :polling_options => polling_options)

  run_with_backoff do
    poller.poll(polling_options) do |messages, stats|
      break if stop?

      messages.each do |message|
        @codec.decode(message.body) do |event|
          @logger.info(event.to_json)
          add_sqs_data(event, message)
          decorate(event)
          output_queue << event
        end
      end

      @logger.debug("SQS Stats:", :request_count => stats.request_count,
                     :received_message_count => stats.received_message_count,
                    :last_message_received_at => stats.last_message_received_at) if @logger.debug?
    end
  end
end
2 Likes