Hello,
I've tried to configure Aggregate filter plugin Timeout parametre above 10s.
The problem is Logstash pipeline closed before this delay.
It seems like while Logstash do not have any event send to output section during a certain period of time, it close the pipeline.
I tried to trick a bit for logstash output to still get event while Aggregate is working properly.
It work for but it's a bit slow.
Does anyone know what am I doing wrong ?
Here is a sample of my logstash code :
input {
jdbc {
jdbc_driver_library => "D:\ISPZ57\SERVEUR_ELK\logstash-6.4.2\config\driverJDBC\ifxjdbc-g.jar"
jdbc_driver_class => "com.informix.jdbc.IfxDriver"
jdbc_connection_string => ************
jdbc_user => ******
jdbc_password => ******
schedule => "/1 * * * * *"
statement => "
SELECT ddemunv,
dstounv,
lnombtc,
ldsrfcn,
ndpt,
letabtc,
nrng,
llig
FROM a7491, a7492
LEFT JOIN a7496 on a7496.vfrk_a7492=a7492.vsrl_a7492
where vfrk_a7491= a7491.vsrl_a7491
and letabtc NOT LIKE 'En Cours'
"
}
}
filter {
mutate {
rename => ["ddemunv", "dollarU_BatchStartTime" ]
rename => ["dstounv", "dollarU_BatchEndTime" ]
rename => ["lnombtc", "idBatch" ]
rename => ["ldsrfcn", "descriptionBatch" ]
rename => ["ndpt", "caisse" ]
rename => ["letabtc", "batchStatus" ]
rename => ["nrng", "idSegment" ]
rename => ["llig", "segment" ]
}
if ![segment] {
drop {}
}
aggregate {
task_id => "%{dollarU_BatchEndTime}-%{idBatch}-@%{caisse}"
code => "
map['dollarU_BatchStartTime'] = event.get('dollarU_BatchStartTime')
map['dollarU_BatchEndTime'] = event.get('dollarU_BatchEndTime')
map['idBatch'] = event.get('idBatch')
map['descriptionBatch'] = event.get('descriptionBatch')
map['caisse'] = event.get('caisse')
map['idBatch'] = event.get('idBatch')
map['batchStatus'] = event.get('batchStatus')
map['compteRenduXML'] ||= [];
map['compteRenduXML'] << event.get('segment')
#event.cancel()
"
push_map_as_event_on_timeout => true
timeout => 60
timeout_tags => [ "compteRenduAggregate_Successful" ]
}
mutate {
join => { "compteRenduXML" => "" }
}
if "compteRenduAggregate_Successful" in [tags] {
# dollarUBatchEndTime est utilisé comme Timestamp de l'enregistrement
# le fuseau horaire se règle directement dans Kibana
mutate {
copy => { "dollarU_BatchEndTime" => "@timestamp" }
}
mutate {
add_field => { "source" => "********" }
}
ruby {
code => "
event.set('[dollarU_Duration]', event.get('[dollarU_BatchEndTime]') - event.get('[dollarU_BatchStartTime]'))
"
}
ruby {
path => "D:\ISPZ57\SERVEUR_ELK\logstash-6.4.2\config\BDD-OSF-scriptDecodageCr.rb"
}
}
}
output {
codec => line {
format => "%{[dollarU_BatchEndTime]},%{[idBatch]},%{[caisse]},%{[compteRenduXML]},"
}
path => "D:\ISPZ57\SERVEUR_ELK\LOGS\bdd-osf-script.csv"
}
if "compteRenduAggregate_Successful" in [tags] {
# Injection dans la base Elasticsearch
elasticsearch {
hosts => ["localhost:9200", "localhost:9201"]
manage_template => false
index => "ispz57-osfbdd-v1-%{+YYYY.MM}"
document_id => "%{dollarU_BatchEndTime}-%{idBatch}@%{caisse}%{idagg}"
}
}
stdout {
codec => rubydebug { metadata => true }
}
}