Logstash fails to aggregate documents with similar timestamps

I have Logstash running on a server, receiving its input from Filebeat and sending its output to Elasticsearch. The inputs are of two types: "data arrival" and "sent data". Each input of type "data arrival" should be paired with a specific input of type "sent data". Logstash does this pairing by aggregating the inputs that have a common value for the field "port number". The documents written in the Elasticsearch index merge the information contained in the paired "data arrival" and "sent data" inputs. In other words, these documents should not present the "data arrival" input individually (and similarly for the "sent data" input).

I have noticed that when the documents in the index have very similar timestamps, sometimes the "data arrival" and "sent data" appear separately. For instance, I have documents with timestamps 23:07:17.356, 23:07:17.361, and 23:07:17.372. The first and last contain paired information, but the document in between (i.e., 23:07:17.361) only contains the "data arrival" information. The associated "sent data" information appears in a separate document, with timestamp 23:08:22.359. This does not occur when the documents have timestamps separated by more than one minute (which corresponds to 95% of the cases).

Assuming that the logic in the Logstash configuration file is correct, how should I deal with this situation? Could a simple lack of resources on the server explain this behavior?

You need to show us the logstash configuration and provide sample messages that it is processing.

Hi @Badger, You're right. Here's my logstash configuration file.

input {
    beats {
	port => "5044"
	}
}


filter {
    mutate { 
	gsub => [ "message", '\u0003', ''] 
	 }


    grok {
        match => { "message" => [
		"%{DATESTAMP:timestamp}%{GREEDYDATA}%{IP:HOST} %{NUMBER:src_port:int} (?<rc_head>([R]+))%{DATA:Estacion}#%{DATA:terminal}\-#%{DATA:letitbe}#(?<Tarjeta_part3>\d{0,7})(?<gardening>\d{0,5})(?<part3>\d{0,4})\=%{DATA:Pista2}#%{DATA:bpin}\-%{DATA:Tipo_Transaccion}#%{NUMBER:'kilos'}#%{NUMBER:'potatoes'}#1#%{DATA:diesel}#%{DATA:Opereción-Autorización}#%{DATA:Km}#%{DATA:Código}#%{DATA:Stan}#%{NUMBER:TimeStamp}#%{DATA:Currency}#%{DATA:NumAuthor}#%{DATA:EndShift}#%{DATA:Details}#%{DATA:LicencePlate}#",
		"%{DATESTAMP:timestamp}%{GREEDYDATA}%{IP:HOST} %{NUMBER:src_port:int} (?<rc_head>([R]+))%{DATA:Estacion}#%{DATA:terminal}\-#%{DATA:letitbe}#(?<Tarjeta_part3>\d{0,7})(?<gardening>\d{0,5})(?<part3>\d{0,4})\=%{DATA:Pista2}#%{DATA:bpin}\-%{DATA:Tipo_Transaccion}#%{DATA:Campo1}#%{DATA:Campo2}#1#%{DATA:diesel}#%{DATA:Campo3}#%{DATA:Campo4}#%{DATA:Campo5}#%{DATA:Campo6}#",
		"%{DATESTAMP:timestamp}%{GREEDYDATA}%{IP:HOST} %{NUMBER:src_port:int} (?<rc_head>([R]+))%{DATA:Estacion}#%{DATA:terminal}\-#%{DATA:letitbe}#(?<Tarjeta_part3>\d{0,7})(?<gardening>\d{0,5})(?<part3>\d{0,5})\=%{DATA:Pista2}#%{DATA:bpin}\-%{DATA:Tipo_Transaccion}#%{DATA:Campo1}#%{DATA:Campo2}#1#%{DATA:diesel}#%{DATA:Campo3}#%{DATA:Campo4}#%{DATA:Campo5}#%{DATA:Campo6}#",	
		"%{DATESTAMP:timestamp}%{GREEDYDATA}%{IP:HOST} %{NUMBER:src_port:int} (?<rc_head>([R]+))%{DATA:Estacion}#%{DATA:terminal}\-#%{DATA:letitbe}#(?<Tarjeta_part3>\d{0,7})(?<gardening>\d{0,5})(?<part3>\d{0,5})\=%{DATA:Pista2}#%{DATA:bpin}\-%{DATA:Tipo_Transaccion}#%{DATA:Campo1}#%{DATA:Campo2}#1#%{DATA:diesel}#",
		"%{DATESTAMP:timestamp}%{GREEDYDATA}%{IP:HOST} %{NUMBER:src_port:int} (?<rc_head>([R]+))%{DATA:Estacion}#%{DATA:terminal}\-#%{DATA:letitbe}#(?<Tarjeta_part3>\d{0,7})(?<gardening>\d{0,5})(?<part3>\d{0,5})\=%{DATA:Pista2}#%{DATA:bpin}\-%{DATA:Tipo_Transaccion}#%{DATA}#%{DATA}#1#%{DATA:diesel}#%{GREEDYDATA}",       
		"%{DATESTAMP:timestamp}%{GREEDYDATA}%{IP:HOST} %{NUMBER:src_port:int} (?<rc_head>([L]+))%{DATA:green}#%{DATA:codigo}#%{DATA:top_secret}#%{DATA:prado}#%{NUMBER:'kilos'}#%{NUMBER:precio}#%{NUMBER:'potatoes'}#1#%{NUMBER:fop}#%{GREEDYDATA}#%{GREEDYDATA:cacauete}",
		"%{DATESTAMP:timestamp}%{GREEDYDATA}%{IP:HOST} %{NUMBER:src_port:int} (?<rc_head>([R]+))%{DATA:Estacion}#%{DATA:terminal}\-#%{DATA:letitbe}#(?<Tarjeta_part3>\d{0,7})(?<gardening>\d{0,5})(?<part3>\d{0,4})\=%{DATA:Pista2}#%{DATA:bpin}\-%{DATA:Tipo_Transaccion}#%{NUMBER:'kilos'}#%{NUMBER:'potatoes'}#%{DATA:Q}#%{DATA:diesel}#",
		"%{DATESTAMP:timestamp}%{GREEDYDATA}%{IP:HOST} %{NUMBER:src_port:int} (?<rc_head>([R]+))%{DATA:Estacion}#%{DATA:terminal}\-#%{DATA:letitbe}#(?<Tarjeta_part3>\d{0,7})(?<gardening>\d{0,5})(?<part3>\d{0,4})\=%{DATA:Pista2}#%{DATA:bpin}\-%{DATA:Tipo_Transaccion}#%{NUMBER:'kilos'}#%{DATA:Espacio1}#1#%{DATA:diesel}#%{DATA:Espacio2}#%{DATA:Espacio3}#%{DATA:Espacio4}#%{DATA:Espacio5}#",
		"%{DATESTAMP:timestamp}%{GREEDYDATA}%{IP:HOST} %{NUMBER:src_port:int} (?<rc_head>([L]+))%{DATA:green}#%{DATA:codigo}#%{DATA:top_secret}#%{DATA:prado}#%{NUMBER:'kilos'}#%{NUMBER:precio}#%{NUMBER:'potatoes'}#1#%{NUMBER:fop}",
		"%{DATESTAMP:timestamp}%{GREEDYDATA}%{IP:HOST} %{NUMBER:src_port:int} (?<rc_head>([R]+))%{DATA:Estacion}#%{DATA:terminal}\-#%{DATA:letitbe}#(?<Tarjeta_part3>\d{0,7})(?<gardening>\d{0,5})(?<part3>\d{0,4})\=%{DATA:Pista2}#%{DATA:bpin}\-%{GREEDYDATA:Tipo_Transaccion}",
		"%{DATESTAMP:timestamp}%{GREEDYDATA}%{IP:HOST} %{NUMBER:src_port:int} (?<rc_head>([R]+))%{DATA:Estacion}#%{DATA:terminal}\-#%{DATA:letitbe}#(?<Tarjeta_part3>\d{0,7})(?<gardening>\d{0,5})(?<part3>\d{0,4})\=%{DATA:Pista2}#%{DATA:bpin}\-%{DATA:Tipo_Transaccion}#%{NUMBER:blue}#1#%{DATA:top_secret}#",
		"%{DATESTAMP:timestamp}%{GREEDYDATA}%{IP:HOST} %{NUMBER:src_port:int} (?<rc_head>([L]+))%{DATA:green}#%{DATA:codigo}#%{DATA:top_secret}#%{DATA:prado}#%{NUMBER:'potatoes'}#1#%{NUMBER:fop}#%{GREEDYDATA}#%{DATA:cacauete}",
		"%{DATESTAMP:timestamp}%{GREEDYDATA}%{IP:HOST} %{NUMBER:src_port:int} (?<rc_head>([L]+))%{DATA:green}#%{DATA:codigo}#%{DATA:top_secret}#%{DATA:prado}#%{NUMBER:'kilos'}#%{NUMBER:precio}#%{NUMBER:'potatoes'}#1#%{NUMBER:fop}"
		]
        }
    }

    if [message] =~ "DataArrival" {
        mutate {
            add_tag => [ "transactionStarted", "parse" ]
            add_field => { "type" => "DataArrival" }
            }
	 ruby {
            code => "event.set(''potatoes'', event.get(''potatoes'').to_f / 100)"
        }
        ruby {
            code => "event.set(''kilos'', event.get(''kilos'').to_f / 100)"
        }
    }

    if [message] =~ "complete" {
        mutate {
            add_tag => [ "transactionTerminated", "parse" ]
            add_field => { "type" => "Send Complete" }
            }

        ruby {
            code => "event.set(''potatoes'', event.get(''potatoes'').to_f / 100)"
        }
        ruby {
            code => "event.set(''kilos'', event.get(''kilos'').to_f / 100)"
        }
        ruby {
            code => "event.set('xyz', event.get('xyz').to_f / 1000)"
        }
    }


    if "parse" not in [tags] { drop {} }

	
	date {
        # 04/07/21 00:01:24
        match => [ "logdate", "dd/MM/yy HH:mm:ss" ]
        timezone => "UTC"
        target => "@timestamp"
    }

    date {
        match => [ "tmpdatetime", "yyMMddHHmm" ]
        timezone => "UTC"
        target => "datetime"
    }

    mutate {
        remove_field => [ "tmpdatetime", "path", "host" ]
    }

    fingerprint {
        source => ["src_ip", "src_port"]
    }

    elapsed {
            start_tag => "transactionStarted"
            end_tag => "transactionTerminated"
            unique_id_field => "fingerprint"
    }


#-------------------------------------------------------------------------------

    aggregate {
      task_id => "%{src_port}"
       code => "
        map['src_port'] ||= event.get('src_port')
 		map['HOST'] ||= event.get('HOST')
		map[''kilos''] ||= event.get(''kilos'')
		map[''potatoes''] ||= event.get(''potatoes'')
		map['xyz'] ||= event.get('xyz')
		map['cod_es'] ||= event.get('abcde')
		map['terminal_es'] ||= event.get('terminal')	
        map['station'] ||= []
		map['station'] << {'carrots' => event.get('carrots')}
        map['station'] << {'message' => event.get('message')}
        map['station'] << {'gardening' => event.get('gardening')}
        map['station'] << {'diesel' => event.get('diesel')}
        map['station'] << {'top_secret' => event.get('top_secret')}
        map['station'] << {'green' => event.get('green')}
        map['station'] << {''potatoes'' => event.get(''potatoes'')}
        map['station'] << {''kilos'' => event.get(''kilos'')}
        map['station'] << {'blue' => event.get('blue')}
        map['station'] << {'part3' => event.get('part3')}
        map['station'] << {'Tarjeta_part3' => event.get('Tarjeta_part3')}
        map['station'] << {'bpin' => event.get('bpin')}
        map['station'] << {'letitbe' => event.get('letitbe')}
        map['station'] << {'terminal' => event.get('terminal')}
        map['station'] << {'cacauete' => event.get('cacauete')}
        map['station'] << {'fop' => event.get('fop')}
        map['station'] << {'xyz' => event.get('xyz')}
        map['station'] << {''potatoes'_devolucion' => event.get(''potatoes'_devolucion')}
        map['station'] << {'prado' => event.get('prado')}
        map['station'] << {'carrots' => event.get('carrots')}
        map['station'] << {'elapsed_time' => event.get('elapsed_time')}
        event.cancel()
       "
       push_previous_map_as_event => true
       timeout_task_id_field => "src_port"
       timeout => 900
    }





}

output {
    elasticsearch {
        hosts => ["https://acme.net:443"]
        user => "user"
        password => "password"
        ssl => true
        ilm_rollover_alias => "ilm"
        ilm_pattern => "000001"
        }
}

You are using push_previous_map_as_event. That assumes the order of events is preserved. Make sure pipeline.workers is set to 1 and pipeline.ordered evaluates to true.