Logstash Plugins: input-webhdfs

I need a plugin able to read logs files from HDFS. I took the output-webhdfs plugin and changed to download files from HDFS using its REST API then parsing this temporary files with adequate codec.

For now I can connect to hdfs and get the files, but I have some troubles when using json_lines codec to parse them. I will share my code, I need you're help to figure out what I'm doing wrong and eventually if you have some recommendations. code here

Thank you in advance

Hello @ohk,

What are exactly the symptoms you are having?

  • Incorrectly parsed data?
  • Logstash hang?
  • Any errors in the log?

What is the structure of your document that you are fetching from HDFS? It is one JSON document per line, In this case the json codec might be more appropriate and you make your input code extract one line at a time and send it to the decode method of the codec.

I've taken a quick look at the code, I don't know the details of the library but I can give you this recommendation.

I think there is a small problem in the #parse_events method, you are trying to read from the client @client.read, write to a file, read from the file that you are currently writing?

  def parse_events(filename, out_queue)
    FileUtils::mkdir_p "/tmp/logstash-input-webhdfs"
    File.open("/tmp/logstash-input-webhdfs/tmp_#{filename.to_i(32)}", 'w+') do |io|
      @codec.decode(io.read) do |event|
        event["host"] = @host+":"+@port
        event["filename"] = filename
        @logger.debug("Event : #{event}")
        out_queue << event
  end # def parse_events

I don't know what @client.read returns, it is line or chunk of text? I think you could solve this problem in two differents ways:

  • Read the file from the service, write it to disk
  • Process the file, read the file from the beginning and process each line.
  • If you can get line with client#read, you just send line directly to the code.

Thank you very much for your help. I changed codec to json and I follower the first approach (write to file then read file line by line) and it works. But I still have a warning when inserting to elastic.
my input files look like this: input file
Warning message:

[2017-04-03T16:36:41,232][WARN ][logstash.outputs.elasticsearch] Failed action. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"logstash-2017.04.03", :_type=>"logs", :_routing=>nil}, 2017-04-03T14:36:41.065Z %{host} %{message}], :response=>{"index"=>{"_index"=>"logstash-2017.04.03", "_type"=>"logs", "_id"=>"AVs0PxVnWIlWG4bx6UaU", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"Can't merge a non object mapping [Properties.spark.rdd.scope] with an object mapping [Properties.spark.rdd.scope]"}}}}
[2017-04-03T16:36:41,232][WARN ][logstash.outputs.elasticsearch] Failed action. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"logstash-2017.04.03", :_type=>"logs", :_routing=>nil}, 2017-04-03T14:36:41.066Z %{host} %{message}], :response=>{"index"=>{"_index"=>"logstash-2017.04.03", "_type"=>"logs", "_id"=>"AVs0PxVnWIlWG4bx6UaV", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"Can't merge a non object mapping [Properties.spark.rdd.scope] with an object mapping [Properties.spark.rdd.scope]"}}}}

I think it's linked to the mapping in elastic.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.