Multi input, multi output and aggregate filter


(Amandine B) #1

Hello,

I'm trying to configure an instance of Logstash to read into 3 differents databases and to output, for each database, into a specific ElasticSearch index (one database <-> one index).

For each databases, the structure is the same, only datas are different.

I wanted to use one config file for each document type I want to index, and duplicate this file for each database, but I'm using the "aggregate" filter, and when I launch Logstash, I get this error "Aggregate plugin: For task_id pattern '%{compId}', there are more than one filter which defines timeout options. All timeout options have to be defined in only one aggregate filter per task_id pattern".

I tried to add the database name into the "task_id" field (task_id => "database1_%{compId}" for database1, task_id => "database2_%{compId}" for database2) but the ElasticSearch output is not good anymore... Aggregations doesn't work properly.

I have no more idea to fix this issue... Could you please help me ?

An exemple of config file (I have around twenty of files like this) :

input {
	jdbc {
		# Chaine de connexion à la BDD
		jdbc_connection_string => "jdbc:mysql://localhost:3306/bdd"
		# User et password de la BDD
		jdbc_user => "userbdd"
		jdbc_password => "pwdbdd"
		# Chemin du fichier JDBC driver
		jdbc_driver_library => "/home/logstash/config/mysql-connector-java-5.1.43-bin.jar"
		# Le nom du driver JDBC
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		# La requête à executer dans la BDD
		statement => "SELECT
                          'NomDuType' as docIndex,
                          ..."
		#Lecture dans la base toutes les secondes
		schedule => "* * * * * *"
		#On veut garder les fields dans ES en case sensitive
		lowercase_column_names => false
	}
}

filter {
    if [docIndex] == 'NomDuType' {
	aggregate {
            task_id => "%{compId}"
            code => "
                map['compId'] = event.get('compId')

                map['memberGroups'] ||= []
                map['memberGroups'] << event.get('memberGroupsId') unless map['memberGroups'].include?(event.get('memberGroupsId'))

                map['attachments'] ||= []
                map['attachments'] << event.get('attachmentsId') unless map['attachments'].include?(event.get('attachmentsId'))

                if (map['attachments'].length > 0 and map['attachments'][0] != nil)
                                map['jointCount'] = map['attachments'].length
                end
                event.cancel()
            "
            push_previous_map_as_event => true
	}

        if [archived] == 1 {
            ruby {
                code => "event.set('archived', TRUE)"
            }
        } else {
            ruby {
                code => "event.set('archived', FALSE)"
            }
        }

        if [sagaDeleted] == 1 {
            mutate {
                add_field => {
                    "[@metadata][elasticsearch_action]" => "delete"
                }
            }
            mutate {
                remove_field => [ "docIndex", "sagaDeleted" ]
            }
        } else {
            mutate {
                add_field => {
                    "[@metadata][elasticsearch_action]" => "index"
                }
            }
            mutate {
                remove_field => [ "docIndex", "sagaDeleted" ]
            }
        }
    }
}

#Données à écrire
output {
	#Configuration de l'écriture des logs (stdout, file...)
	stdout { codec => json_lines }
	#Enregistrement dans elasticsearch
	elasticsearch {
		# Serveur ES
		"hosts" => "localhost:9200"
		# Nom de l'index dans lequel les données sont enregistrées
		"index" => "test-index"
		# Action à effectuer (index ou delete)
		"action" => "%{[@metadata][elasticsearch_action]}"
		# Nom du type dans lequel les données sont enregistrées
		"document_type" => "NomDuType"
		# Id du document (pour ne pas réindexer les documents à chaque requête)
		"document_id" => "%{tableId}"
	}
}

Thanks


(system) #2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.