Aggregate filter plugin timeout weird behavior

Hello,

I've tried to configure Aggregate filter plugin Timeout parametre above 10s.
The problem is Logstash pipeline closed before this delay.

It seems like while Logstash do not have any event send to output section during a certain period of time, it close the pipeline.

I tried to trick a bit for logstash output to still get event while Aggregate is working properly.
It work for but it's a bit slow.

Does anyone know what am I doing wrong ?

Here is a sample of my logstash code :

input {
    jdbc {
		jdbc_driver_library => "D:\ISPZ57\SERVEUR_ELK\logstash-6.4.2\config\driverJDBC\ifxjdbc-g.jar"
		jdbc_driver_class => "com.informix.jdbc.IfxDriver"
		jdbc_connection_string => ************
		jdbc_user => ******
		jdbc_password => ******
		schedule => "/1 * * * * *"
		statement => "
			SELECT ddemunv,
			dstounv,
			lnombtc,
			ldsrfcn,
			ndpt, 
			letabtc, 
			nrng, 
			llig 
			FROM a7491, a7492 
			LEFT JOIN a7496 on a7496.vfrk_a7492=a7492.vsrl_a7492 
			where vfrk_a7491= a7491.vsrl_a7491 
			and letabtc NOT LIKE 'En Cours'
		"
	}
}

filter {

	mutate {
		rename => ["ddemunv", "dollarU_BatchStartTime" ]
		rename => ["dstounv", "dollarU_BatchEndTime" ]
		rename => ["lnombtc", "idBatch" ]
		rename => ["ldsrfcn", "descriptionBatch" ]
		rename => ["ndpt", "caisse" ]
		rename => ["letabtc", "batchStatus" ]
		rename => ["nrng", "idSegment" ]
		rename => ["llig", "segment" ]
	}

	if ![segment] {
		drop {}
	}

	aggregate {

		task_id => "%{dollarU_BatchEndTime}-%{idBatch}-@%{caisse}"
		code => "
			map['dollarU_BatchStartTime'] = event.get('dollarU_BatchStartTime')
			map['dollarU_BatchEndTime'] = event.get('dollarU_BatchEndTime')
			map['idBatch'] = event.get('idBatch')
			map['descriptionBatch'] = event.get('descriptionBatch')
			map['caisse'] = event.get('caisse')
			map['idBatch'] = event.get('idBatch')
			map['batchStatus'] = event.get('batchStatus')
			
			map['compteRenduXML'] ||= [];
			map['compteRenduXML'] << event.get('segment')
			#event.cancel()
		"
		push_map_as_event_on_timeout => true	
		timeout => 60
		timeout_tags => [ "compteRenduAggregate_Successful" ]
	}

	mutate {
		join => { "compteRenduXML" => "" }
	}

	if "compteRenduAggregate_Successful" in [tags] {
		
		# dollarUBatchEndTime est utilisé comme Timestamp de l'enregistrement
		# le fuseau horaire se règle directement dans Kibana
		mutate {
			copy => { "dollarU_BatchEndTime" => "@timestamp" }
		}

		mutate {
			add_field => { "source" => "********" }
		}

		ruby {
			code => "
			event.set('[dollarU_Duration]', event.get('[dollarU_BatchEndTime]') - event.get('[dollarU_BatchStartTime]'))
			"
		}	
		

		ruby {
			path => "D:\ISPZ57\SERVEUR_ELK\logstash-6.4.2\config\BDD-OSF-scriptDecodageCr.rb"
		}
	}
}

output {

	    codec => line {
			format => "%{[dollarU_BatchEndTime]},%{[idBatch]},%{[caisse]},%{[compteRenduXML]},"
        }
		path => "D:\ISPZ57\SERVEUR_ELK\LOGS\bdd-osf-script.csv"
	}
	
	if "compteRenduAggregate_Successful" in [tags] {
		# Injection dans la base Elasticsearch
		elasticsearch {

			hosts => ["localhost:9200", "localhost:9201"]
			manage_template => false
			index => "ispz57-osfbdd-v1-%{+YYYY.MM}"
			document_id => "%{dollarU_BatchEndTime}-%{idBatch}@%{caisse}%{idagg}"
		}
    }
    
    stdout { 
         codec => rubydebug { metadata => true }
    }
}

Anyone to help me ?

I'll probably move to Spark :face_with_raised_eyebrow:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.