Hello,
I'm trying to configure an instance of Logstash to read into 3 differents databases and to output, for each database, into a specific ElasticSearch index (one database <-> one index).
For each databases, the structure is the same, only datas are different.
I wanted to use one config file for each document type I want to index, and duplicate this file for each database, but I'm using the "aggregate" filter, and when I launch Logstash, I get this error "Aggregate plugin: For task_id pattern '%{compId}', there are more than one filter which defines timeout options. All timeout options have to be defined in only one aggregate filter per task_id pattern".
I tried to add the database name into the "task_id" field (task_id => "database1_%{compId}" for database1, task_id => "database2_%{compId}" for database2) but the ElasticSearch output is not good anymore... Aggregations doesn't work properly.
I have no more idea to fix this issue... Could you please help me ?
An exemple of config file (I have around twenty of files like this) :
input {
jdbc {
# Chaine de connexion Ă la BDD
jdbc_connection_string => "jdbc:mysql://localhost:3306/bdd"
# User et password de la BDD
jdbc_user => "userbdd"
jdbc_password => "pwdbdd"
# Chemin du fichier JDBC driver
jdbc_driver_library => "/home/logstash/config/mysql-connector-java-5.1.43-bin.jar"
# Le nom du driver JDBC
jdbc_driver_class => "com.mysql.jdbc.Driver"
# La requĂȘte Ă executer dans la BDD
statement => "SELECT
'NomDuType' as docIndex,
..."
#Lecture dans la base toutes les secondes
schedule => "* * * * * *"
#On veut garder les fields dans ES en case sensitive
lowercase_column_names => false
}
}
filter {
if [docIndex] == 'NomDuType' {
aggregate {
task_id => "%{compId}"
code => "
map['compId'] = event.get('compId')
map['memberGroups'] ||= []
map['memberGroups'] << event.get('memberGroupsId') unless map['memberGroups'].include?(event.get('memberGroupsId'))
map['attachments'] ||= []
map['attachments'] << event.get('attachmentsId') unless map['attachments'].include?(event.get('attachmentsId'))
if (map['attachments'].length > 0 and map['attachments'][0] != nil)
map['jointCount'] = map['attachments'].length
end
event.cancel()
"
push_previous_map_as_event => true
}
if [archived] == 1 {
ruby {
code => "event.set('archived', TRUE)"
}
} else {
ruby {
code => "event.set('archived', FALSE)"
}
}
if [sagaDeleted] == 1 {
mutate {
add_field => {
"[@metadata][elasticsearch_action]" => "delete"
}
}
mutate {
remove_field => [ "docIndex", "sagaDeleted" ]
}
} else {
mutate {
add_field => {
"[@metadata][elasticsearch_action]" => "index"
}
}
mutate {
remove_field => [ "docIndex", "sagaDeleted" ]
}
}
}
}
#Données à écrire
output {
#Configuration de l'Ă©criture des logs (stdout, file...)
stdout { codec => json_lines }
#Enregistrement dans elasticsearch
elasticsearch {
# Serveur ES
"hosts" => "localhost:9200"
# Nom de l'index dans lequel les données sont enregistrées
"index" => "test-index"
# Action Ă effectuer (index ou delete)
"action" => "%{[@metadata][elasticsearch_action]}"
# Nom du type dans lequel les données sont enregistrées
"document_type" => "NomDuType"
# Id du document (pour ne pas rĂ©indexer les documents Ă chaque requĂȘte)
"document_id" => "%{tableId}"
}
}
Thanks