I currently have a setup where Filebeats is tracking a folder (in which there is only 1 file), and sends entries to Logstash. In my logstash.yml, I have pipeline.workers set to 1. Here is a snippet of my logstash.conf, from the "filter" section.
mutate {
add_field => {"Entry_time" => "%{[Entry_time_array][0]}"}
add_field => {"Data_Added_Time" => "%{@timestamp}"}
date {
match => ["Entry_time", "MM/dd/yyyy HH:mm:ss:SSS"]
target => "@timestamp"
grok {
match => { "Entry_msg" => "TIMING: 'Load Plan Complete', Plan ID '%{NUMBER:taskID}'" }
add_tag => [ "loadplan" ]
ruby {
init => "@taskID = -1"
code => "
if !event.get('taskID').nil?
@taskID = event.get('taskID')
event.set('taskID', @taskID)
grok {
match => {"Entry_msg" => "%{GREEDYDATA:logMessage}"}
grok {
match => { "Entry_msg" => "UI::Received On Treatment Delivery Completed Event" }
add_tag => [ "treatmentcomplete" ]
grok {
match => { "Entry_msg" => "Successful Open Delivery Session without Tracking" }
add_tag => [ "treatmentstart" ]
code => "
event.set('currentEpochSetTime', Time.now)
event.set('currentEpoch', event.get('@timestamp').to_i)
if [logMessage] =~ "TIMING: 'Load Plan Complete'.*"{
aggregate_maps_path => "./aggregate_maps"
task_id => "%{taskID}"
code => "
map['onTableStart'] ||= 0
map['onTableStart'] = event.get('currentEpoch')
event.set('onTableStartIs', map['onTableStart'])
event.set('mapUpdateTime', Time.now)
if [logMessage] =~ "Successful Open Delivery Session.*"{
task_id => "%{taskID}"
code => "
map['treatmentStart'] ||= 0
map['treatmentStart'] = event.get('currentEpoch')
event.set('treatmentStartIs', map['treatmentStart'])
event.set('mapUpdateTime', Time.now)
if [logMessage] =~ ".*Received On Treatment Delivery Completed.*" {
aggregate {
task_id => "%{taskID}"
code => "
event.set('onTableStartIs', map['onTableStart'])
event.set('treatmentStartIs', map['treatmentStart'])
event.set('onTableTime', event.get('currentEpoch') - map['onTableStart'])
event.set('treatmentTime', event.get('currentEpoch') - map['treatmentStart'])
event.set('calculationTime', Time.now)
map_action => "update"
end_of_task => true
} else {
code => "event.set('isNotCompletedEvent', 50)"
The mutate and the date filters at the top (in addition to some previous filters that are not shown) just make it so that "@timestamp" holds the time specified by the entry (which can be whenever), and "Data_added_time" holds the time that the entry was processed by logstash.
For the rest of the config file:
I'm expecting one of three entries to come from filebeats. They are:
- Load Plan Complete
- Successful Open Delivery Session
- Received Treatment Delivery Completed Event
These three entries will always arrive in this order. My goal is to calculate the difference in their @timestamps.
To accomplish this, I am using aggregate maps to track values across multiple entries. The idea is that when I read one of the first two entries, I update the map with a "start time", and when i read the last entry, I use the value in the map to calculate the time difference.
The problem is that I need to guarantee that the map will be updated for messages 1 and 2 BEFORE the calculation happens for message 3. Apparently, this is not guaranteed. I added a couple debug entries to my if-statements. For the first two messages, I added "mapUpdateTime", which marks when the map is updated. For the third message, I added "calculationTime", which marks when the time-difference calculation occured.
So far, it seems guaranteed that Data_added_time is always in the correct order. Meaning the log entries are being processed in the correct order. However, sometimes calculationTime will happen before mapUpdateTime, even though calculationTime is a part of the third message, and therefore should be occuring AFTER mapUpdateTime.
My guess so far is that there's still some multithreading going on, but as I mentioned before, I've already set pipeline.workers to 1. I've also tried using the elapsed filter, but that did not work either.
I'm at a loss on what to do, so any help at all would be greatly appreciated. Thanks!