Hi,
I need a plugin able to read logs files from HDFS. I took the output-webhdfs plugin and changed to download files from HDFS using its REST API then parsing this temporary files with adequate codec.
For now I can connect to hdfs and get the files, but I have some troubles when using json_lines codec to parse them. I will share my code, I need you're help to figure out what I'm doing wrong and eventually if you have some recommendations. code here
What is the structure of your document that you are fetching from HDFS? It is one JSON document per line, In this case the json codec might be more appropriate and you make your input code extract one line at a time and send it to the decode method of the codec.
I've taken a quick look at the code, I don't know the details of the library but I can give you this recommendation.
I think there is a small problem in the #parse_events method, you are trying to read from the client @client.read, write to a file, read from the file that you are currently writing?
def parse_events(filename, out_queue)
FileUtils::mkdir_p "/tmp/logstash-input-webhdfs"
File.open("/tmp/logstash-input-webhdfs/tmp_#{filename.to_i(32)}", 'w+') do |io|
io.write(@client.read(filename))
@codec.decode(io.read) do |event|
event["host"] = @host+":"+@port
event["filename"] = filename
decorate(event)
@logger.debug("Event : #{event}")
out_queue << event
end
end
end # def parse_events
I don't know what @client.read returns, it is line or chunk of text? I think you could solve this problem in two differents ways:
Read the file from the service, write it to disk
Process the file, read the file from the beginning and process each line.
If you can get line with client#read, you just send line directly to the code.
Thank you very much for your help. I changed codec to json and I follower the first approach (write to file then read file line by line) and it works. But I still have a warning when inserting to elastic.
my input files look like this: input file
Warning message:
[2017-04-03T16:36:41,232][WARN ][logstash.outputs.elasticsearch] Failed action. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"logstash-2017.04.03", :_type=>"logs", :_routing=>nil}, 2017-04-03T14:36:41.065Z %{host} %{message}], :response=>{"index"=>{"_index"=>"logstash-2017.04.03", "_type"=>"logs", "_id"=>"AVs0PxVnWIlWG4bx6UaU", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"Can't merge a non object mapping [Properties.spark.rdd.scope] with an object mapping [Properties.spark.rdd.scope]"}}}}
[2017-04-03T16:36:41,232][WARN ][logstash.outputs.elasticsearch] Failed action. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"logstash-2017.04.03", :_type=>"logs", :_routing=>nil}, 2017-04-03T14:36:41.066Z %{host} %{message}], :response=>{"index"=>{"_index"=>"logstash-2017.04.03", "_type"=>"logs", "_id"=>"AVs0PxVnWIlWG4bx6UaV", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"Can't merge a non object mapping [Properties.spark.rdd.scope] with an object mapping [Properties.spark.rdd.scope]"}}}}
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.