How to close the file stream once the message has been pushed.?

Hi,

My requirement is to get the remote file using beats and write it to hdfs using logstash.

I need to write the output to hdfs using logstash. However, I cannot use webhdfs, so i used pipe output with hdfs appendToFile to push the data.

This is how i am passing the output.

input {
  beats {
    port => 5044
  }
}

filter {
  grok {   
    match => ["source","%{GREEDYDATA}/%{GREEDYDATA:filename}\.xml"]
  }
}

output {
  pipe { command => "hdfs dfs -appendToFile - /user/srini/%{filename}.xml" message_format => "%{message}"}
}

The hdfs files are getting opened and data has been written to it. However, the files are kept open with 0 bytes data until i stop logstash. I think it might be expecting more output. How can i role the file after the event.?

The pipe output's ttl option should with its default value close the pipe after 10 seconds of not receiving any events. Can you try enabling debug logging by starting Logstash with --debug? Then the plugin will log extra messages when it's closing the pipes.

Hi @magnusbaeck,

I have added --debug and added ttl => 10 to just make sure it is 10 seconds.

There is no error apparently in the trace. Below is the stack trace.

Opening pipe {:command=>"hdfs dfs -appendToFile - /user/input2/srini.xml", :level=>:info, :file=>"logstash/outputs/pipe.rb", :line=>"100", :method=>"get_pipe"}
Pushing flush onto pipeline {:level=>:debug, :file=>"logstash/pipeline.rb", :line=>"450", :method=>"flush"}
16/03/31 15:25:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Starting stale pipes cleanup cycle {:pipes=>{"hdfs dfs -appendToFile - /user/netconf2/srini.xml"=>#<PipeWrapper:0x67ea2fc8 @pipe=#<IO:fd 483>, @active=true>}, :level=>:info, :file=>"logstash/outputs/pipe.rb", :line=>"75", :method=>"close_stale_pipes"}
0 stale pipes found {:inactive_pipes=>{}, :level=>:debug, :file=>"logstash/outputs/pipe.rb", :line=>"77", :method=>"close_stale_pipes"}
Pushing flush onto pipeline {:level=>:debug, :file=>"logstash/pipeline.rb", :line=>"450", :method=>"flush"}
Pushing flush onto pipeline {:level=>:debug, :file=>"logstash/pipeline.rb", :line=>"450", :method=>"flush"}
Pushing flush onto pipeline {:level=>:debug, :file=>"logstash/pipeline.rb", :line=>"450", :method=>"flush"}
Pushing flush onto pipeline {:level=>:debug, :file=>"logstash/pipeline.rb", :line=>"450", :method=>"flush"}

Are you sure the pipe isn't getting events constantly? What if you replace the beats input with a stdin or something else where you control exactly which events are emitted?

(The log snippet above is not a stacktrace.)