EOF in Logstash File Input


I am using Logstash to aggregate some data out of a log file and store it in elastic search, to get some usage data for a software.

We copy the log files into a directory that logstash checks. We use the file input plugin in read mode and delete the files from the directory afterwards.

file {
     mode => "read"
     file_completed_action => "delete"
     path => "path/to/directory/with/logs/*"

For the data aggreation I would need to know when the file ends. Is there any way in Logstash to detect that EOF was reached?
For instance, can I automatically generate a EOF event?
Or detect that an event is the last one in the file?


Not really. If you generate document_id yourself then you could use an aggregate filter to track the most recent record from each file and then generate an upsert to tag the last record after a timeout.

Thank you for the tip. I am using the timeout event function of the aggregate filter as you suggested.

However, I have an issue that the event is not stored in the correct index.

I define the index in an event like:

mutate {
    add_field => { "[@metadata][index]" => "%{[@metadata][beat]}-%{[@metadata][version]}-%{[processor][event]}-%{+yyyy.MM.dd}" } 

In the output I use this as index:

output {
	elasticsearch {
		hosts => [""]
		user => "elastic"
		password => "changeme"
		index => "%{[@metadata][index]}"
	stdout { codec => json }

For the timeout event I add the same index as for the other events:

aggregate {
task_id => "%{[observer][id]}"
code => "
	map['index'] = event.get('[@metadata][index]')
timeout => 120
push_map_as_event_on_timeout => true
timeout_code => "
	event.set('[@metadata][index]', map['index'])
	#aggregate other data

However, the timeout event is stored with the index "%{[@metadata][index]}" literally and not replaced correctly. The other events are stored in the correct index.
There are two fields in the event in elasticsearch then:
_index: %{[@metadata][index]}
index: (With the correct value)

So I guess I am addressing something wrong here, but I can not figure it out.
Does anyone know what is going on there?

Thanks again!

When the timeout_code executes the map has already been converted into an event (in create_previous_map_as_event, the .shift that removes the map happens before the call to create_timeout_event). Try

event.set('[@metadata][index]', event.get('index'))

Ahh okay now it all makes sense.
Thank you!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.