EOF in Logstash File Input

Hello,

I am using Logstash to aggregate some data out of a log file and store it in elastic search, to get some usage data for a software.

We copy the log files into a directory that logstash checks. We use the file input plugin in read mode and delete the files from the directory afterwards.

file {
     mode => "read"
     file_completed_action => "delete"
     path => "path/to/directory/with/logs/*"
   }

For the data aggreation I would need to know when the file ends. Is there any way in Logstash to detect that EOF was reached?
For instance, can I automatically generate a EOF event?
Or detect that an event is the last one in the file?

Thanks!

Not really. If you generate document_id yourself then you could use an aggregate filter to track the most recent record from each file and then generate an upsert to tag the last record after a timeout.

Thank you for the tip. I am using the timeout event function of the aggregate filter as you suggested.

However, I have an issue that the event is not stored in the correct index.

I define the index in an event like:

mutate {
    add_field => { "[@metadata][index]" => "%{[@metadata][beat]}-%{[@metadata][version]}-%{[processor][event]}-%{+yyyy.MM.dd}" } 
}

In the output I use this as index:

output {
	elasticsearch {
		hosts => ["http://127.0.0.1:9200/"]
		user => "elastic"
		password => "changeme"
		index => "%{[@metadata][index]}"
	}
	stdout { codec => json }
}

For the timeout event I add the same index as for the other events:

aggregate {
task_id => "%{[observer][id]}"
code => "
	
	map['index'] = event.get('[@metadata][index]')
"
timeout => 120
push_map_as_event_on_timeout => true
timeout_code => "
	
	event.set('[@metadata][index]', map['index'])
	
	#aggregate other data
"
}

However, the timeout event is stored with the index "%{[@metadata][index]}" literally and not replaced correctly. The other events are stored in the correct index.
There are two fields in the event in elasticsearch then:
_index: %{[@metadata][index]}
index: (With the correct value)

So I guess I am addressing something wrong here, but I can not figure it out.
Does anyone know what is going on there?

Thanks again!

When the timeout_code executes the map has already been converted into an event (in create_previous_map_as_event, the .shift that removes the map happens before the call to create_timeout_event). Try

event.set('[@metadata][index]', event.get('index'))

Ahh okay now it all makes sense.
Works.
Thank you!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.