Making Logstash and/or Filebeats process one entry at a time

I currently have a setup where Filebeats is tracking a folder (in which there is only 1 file), and sends entries to Logstash. In my logstash.yml, I have pipeline.workers set to 1. Here is a snippet of my logstash.conf, from the "filter" section.

mutate {
	add_field => {"Entry_time" => "%{[Entry_time_array][0]}"}
	add_field => {"Data_Added_Time" => "%{@timestamp}"}
}

date {
	match => ["Entry_time", "MM/dd/yyyy  HH:mm:ss:SSS"]
	target => "@timestamp"
}

grok {
	match => { "Entry_msg" => "TIMING: 'Load Plan Complete', Plan ID '%{NUMBER:taskID}'" }
	add_tag => [ "loadplan" ]
}

ruby {
	init => "@taskID = -1"
	code => "
		if !event.get('taskID').nil?
			@taskID = event.get('taskID')
		else
			event.set('taskID', @taskID)
		end
	"
}
grok {
	match => {"Entry_msg" => "%{GREEDYDATA:logMessage}"}
}
grok {
	match => { "Entry_msg" => "UI::Received On Treatment Delivery Completed Event" }
	add_tag => [ "treatmentcomplete" ]
}
grok {
	match => { "Entry_msg" => "Successful Open Delivery Session without Tracking" }
	add_tag => [ "treatmentstart" ]
}

ruby{
	code => "
		event.set('currentEpochSetTime', Time.now)
		event.set('currentEpoch', event.get('@timestamp').to_i)
	"
}

if [logMessage] =~ "TIMING: 'Load Plan Complete'.*"{
	aggregate{
		aggregate_maps_path => "./aggregate_maps"
		task_id => "%{taskID}"
		code => "
			map['onTableStart'] ||= 0
			map['onTableStart'] = event.get('currentEpoch')
			event.set('onTableStartIs', map['onTableStart'])
			event.set('mapUpdateTime', Time.now)
		"
	}
}

if [logMessage] =~ "Successful Open Delivery Session.*"{
	aggregate{
		task_id => "%{taskID}"
		code => "
			map['treatmentStart'] ||= 0
			map['treatmentStart'] = event.get('currentEpoch')
			event.set('treatmentStartIs', map['treatmentStart'])
			event.set('mapUpdateTime', Time.now)
		"
	}
}

if [logMessage] =~ ".*Received On Treatment Delivery Completed.*" {
	aggregate {
		task_id => "%{taskID}"
		code => "
			event.set('onTableStartIs', map['onTableStart'])
  			event.set('treatmentStartIs', map['treatmentStart'])
			event.set('onTableTime', event.get('currentEpoch') - map['onTableStart'])
  			event.set('treatmentTime', event.get('currentEpoch') - map['treatmentStart'])
			event.set('calculationTime', Time.now)
		"
		map_action => "update"
		end_of_task => true
	}
} else {
	ruby{
		code => "event.set('isNotCompletedEvent', 50)"
	}
}

The mutate and the date filters at the top (in addition to some previous filters that are not shown) just make it so that "@timestamp" holds the time specified by the entry (which can be whenever), and "Data_added_time" holds the time that the entry was processed by logstash.

For the rest of the config file:

I'm expecting one of three entries to come from filebeats. They are:

  1. Load Plan Complete
  2. Successful Open Delivery Session
  3. Received Treatment Delivery Completed Event

These three entries will always arrive in this order. My goal is to calculate the difference in their @timestamps.

To accomplish this, I am using aggregate maps to track values across multiple entries. The idea is that when I read one of the first two entries, I update the map with a "start time", and when i read the last entry, I use the value in the map to calculate the time difference.

The problem is that I need to guarantee that the map will be updated for messages 1 and 2 BEFORE the calculation happens for message 3. Apparently, this is not guaranteed. I added a couple debug entries to my if-statements. For the first two messages, I added "mapUpdateTime", which marks when the map is updated. For the third message, I added "calculationTime", which marks when the time-difference calculation occured.

So far, it seems guaranteed that Data_added_time is always in the correct order. Meaning the log entries are being processed in the correct order. However, sometimes calculationTime will happen before mapUpdateTime, even though calculationTime is a part of the third message, and therefore should be occuring AFTER mapUpdateTime.

My guess so far is that there's still some multithreading going on, but as I mentioned before, I've already set pipeline.workers to 1. I've also tried using the elapsed filter, but that did not work either.

I'm at a loss on what to do, so any help at all would be greatly appreciated. Thanks!

I have not read the whole post in detail, so this might be rubbish (it is late for me), but by default the logstash pipeline processes events in batches. 125 events go through the first filter, then 125 events go through the second, etc. If you set pipeline.batch.size to 1 then it might help the function of this particular pipeline, but it might also ruin the scalability of other pipelines. Be careful about setting this without measuring throughput if you have any concerns around scalability.

Oh, yeah, I do also have the batch size set to 1. It doesn't seem to have any effect on my problem.

What version of logstash?

Logstash version is 7.9.0

Filebeats and Elasticsearch version are also 7.9.0

And also Kibana: 7.9.0

OK, so pipeline.ordered should be auto, and since pipeline.workers is 1 the pipeline should be preserving order. I cannot think of anything else that would result in events getting re-ordered. Can you try setting java_execution to false, just in case?

Pipeline.ordered was set to true, but I now have it set to auto. java_execution was already set to false.

It didn't change anything.

But I want to clarify that the issue is not that the events are getting re-ordered. The ordering is preserved. The issue is that a previous event is not being processed to completion before the next event begins processing.

How can you tell?

I can tell because mapUpdateTime for the second event happens after Data_added_time for the third event. If the second event were processed to completion, then mapUpdateTime for the second event should happen BEFORE Data_added_time for the third event.

I have not found a solution to this issue yet. Any additional help would be greatly appreciated.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.