When using the SQS input plugin from the 2.0 branch, and (some of the) 1.5 branch. Payloads with multiple json objects only return a single json object. When using the json codec, it returns the last json object in the payload. When using the json_lines codec, it returns the first object in the payload.
Here's a simplified config.
input {
sqs {
queue => "some_sqs_queue"
#codec => "json_lines"
codec => "json"
}
}
output {
stdout {
codec => rubydebug
}
}
and a sample payload
{"obj1": 1}
{"obj2": 2}
{"obj3": 3}
Tested:
- 5.0.0-apha2 - behavior described above
- 2.3.2 - behavior described above
- 2.2.4 - behavior described above
- 2.1.1 - behavior described above
- 2.0.0 - behavior described above
- 1.5.5 - behavior described above (need to manually fix https://github.com/logstash-plugins/logstash-input-sqs/pull/23)
- 1.5.4 - json_lines behaves as expected if {"obj3": 3} ends in a new line (no new line and the obj1 is dropped)
- 1.5.3 - json_lines behaves as expected if {"obj3": 3} ends in a new line (no new line and the obj3 is dropped)
I don't know the logstash codebase that well, but looking at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-sqs-2.0.5/lib/logstash/inputs/sqs.rb I see this chunk of code
def decode_event(message)
@logger.info(message.body)
@codec.decode(message.body) do |event|
@logger.info(event.to_json)
return event
end
end
I think that return should be an enqueue (or something like that), or wouldn't it always take one item, return and dump the rest on the ground?
1 Like
I hacked on /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-sqs-2.0.5/lib/logstash/inputs/sqs.rb and collapsed some of the nice small functions into something that copes with the json_lines codec. Off to write an issue.
def run(output_queue)
@logger.debug("Polling SQS queue", :polling_options => polling_options)
run_with_backoff do
poller.poll(polling_options) do |messages, stats|
break if stop?
messages.each do |message|
@codec.decode(message.body) do |event|
@logger.info(event.to_json)
add_sqs_data(event, message)
decorate(event)
output_queue << event
end
end
@logger.debug("SQS Stats:", :request_count => stats.request_count,
:received_message_count => stats.received_message_count,
:last_message_received_at => stats.last_message_received_at) if @logger.debug?
end
end
end
2 Likes