Hello @ohk,
What are exactly the symptoms you are having?
- Incorrectly parsed data?
- Logstash hang?
- Any errors in the log?
What is the structure of your document that you are fetching from HDFS? It is one JSON document per line, In this case the json codec might be more appropriate and you make your input code extract one line at a time and send it to the decode method of the codec.
I've taken a quick look at the code, I don't know the details of the library but I can give you this recommendation.
I think there is a small problem in the #parse_events
method, you are trying to read from the client @client.read
, write to a file, read from the file that you are currently writing?
def parse_events(filename, out_queue)
FileUtils::mkdir_p "/tmp/logstash-input-webhdfs"
File.open("/tmp/logstash-input-webhdfs/tmp_#{filename.to_i(32)}", 'w+') do |io|
io.write(@client.read(filename))
@codec.decode(io.read) do |event|
event["host"] = @host+":"+@port
event["filename"] = filename
decorate(event)
@logger.debug("Event : #{event}")
out_queue << event
end
end
end # def parse_events
I don't know what @client.read
returns, it is line or chunk of text? I think you could solve this problem in two differents ways:
-
- Read the file from the service, write it to disk
- Process the file, read the file from the beginning and process each line.
-
- If you can get line with
client#read
, you just send line directly to the code.