[Logstash] Plusieurs fichiers .conf et indexation d'un type X dans un type Y

Bonjour !

Je suis entrain de mettre en place une configuration Logstash pour lire les données dans une base de données et écrire dans un ElasticSearch.
J'ai géré ma configuration par plusieurs fichiers .conf dans un seul répertoire et je lance Logstash en pointant sur ce répertoire.

J'ai un problème dans un index dans ElasticSearch : j'obtiens bien mes documents du type que j'attends, mais j'obtiens aussi des documents qui ne devraient pas être dans ce type.
En réalité, j'obtiens un document de chaque type configuré dans ce type... Je ne comprends pas.

Je ne trouve pas de solutions pour filtrer les données dans l'output, est-ce que vous auriez une solution ou une idée ?

Merci d'avance pour votre aide !

Est-ce que le problème est reproductible avec un seul fichier config. Ensuite pouvez-vous joindre le fichier config?

Une note, l'utilisation de plusieurs types dans un index sera plus possible dans le future et est a éviter, un conseil ici serait de n'utiliser qu'un type par index

Bonjour,

Si je mets mes deux configurations dans un seul fichier .conf, j'ai toujours le même problème.

Mes configurations sont toutes équivalentes à celle ci-dessous :

input {
	jdbc {
		# Chaine de connexion à la BDD
		jdbc_connection_string => "jdbc:mysql://localhost:3306/bdd"
		# User et password de la BDD
		jdbc_user => "userbdd"
		jdbc_password => "pwdbdd"
		# Chemin du fichier JDBC driver
		jdbc_driver_library => "/home/logstash/config/mysql-connector-java-5.1.43-bin.jar"
		# Le nom du driver JDBC
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		# La requête à executer dans la BDD
		statement => "SELECT
                          'NomDuType' as docIndex,
                          ..."
		#Lecture dans la base toutes les secondes
		schedule => "* * * * * *"
		#On veut garder les fields dans ES en case sensitive
		lowercase_column_names => false
	}
}

filter {
    if [docIndex] == 'NomDuType' {

        if [archived] == 1 {
            ruby {
                code => "event.set('archived', TRUE)"
            }
        } else {
            ruby {
                code => "event.set('archived', FALSE)"
            }
        }

        if [sagaDeleted] == 1 {
            mutate {
                add_field => {
                    "[@metadata][elasticsearch_action]" => "delete"
                }
            }
            mutate {
                remove_field => [ "docIndex", "sagaDeleted" ]
            }
        } else {
            mutate {
                add_field => {
                    "[@metadata][elasticsearch_action]" => "index"
                }
            }
            mutate {
                remove_field => [ "docIndex", "sagaDeleted" ]
            }
        }
    }
}

#Données à écrire
output {
	#Configuration de l'écriture des logs (stdout, file...)
	stdout { codec => json_lines }
	#Enregistrement dans elasticsearch
	elasticsearch {
		# Serveur ES
		"hosts" => "localhost:9200"
		# Nom de l'index dans lequel les données sont enregistrées
		"index" => "test-index"
		# Action à effectuer (index ou delete)
		"action" => "%{[@metadata][elasticsearch_action]}"
		# Nom du type dans lequel les données sont enregistrées
		"document_type" => "NomDuType"
		# Id du document (pour ne pas réindexer les documents à chaque requête)
		"document_id" => "%{tableId}"
	}
}

Je ne vois pas de filtre (where clause) sur un champs qui sépare les données a insérer dans l'exemple?
Aussi docIndex semble inutile car c'est un champs avec une value fixe ('NomDuType') dans l'input donc la condition dans le filtre sur le champs docIndex sera toujours vraix

Dans la partie Input por le scenario, nous devrions voir quelque chose comme ca - et le meme select statement peut être testé avec le client mysql pour verifier que seulement les enregistrements voulu sont retournés:
Select list, of, fields from table_name where some_column = "value_used_to_separate_data"

Bonjour et merci pour votre réponse !

Non, il n'y a pas de where dans la requête SQL car je veux indexer toutes les données de ma table dans l'index ElasticSearch.

Le "docIndex" permet justement de n'effectuer le filter que sur les données de cette requête SQL, et pas sur les données des autres requêtes SQL configurées. J'ai vu cette astuce dans plusieurs exemples sur internet : https://stackoverflow.com/questions/18330541/how-to-handle-multiple-heterogeneous-inputs-with-logstash

Dans ce cas la, dans la partie output de elasticsearch :
"document_type" => "NomDuType" est incorrect,

"document_type" => "%{docIndex}" devrait accomplir ce que vous cherchez a faire - ceci utilisera la valeur du champs docIndex retourné par le SELECT (qui si je comprends bien sera de valeur NomDuType dans le cas donné et surement une autre valeur pour un autre select a dirige vers un autre type)
Comme dit initialement le mieux serait de faire la separation sur index et non pas document_type afin que chaque index est un seul type.

J'ai teste ca en 5.5.2 avec mysql, ceci envoie les 2 documents dans 2 types en utilisant un champs de la table:

Mysql statements for reference

DROP DATABASE TEST;
CREATE DATABASE TEST;
USE TEST;
CREATE TABLE Persons (ID varchar(30),
LastName varchar(255),
FirstName varchar(255),
City varchar(255)
);
INSERT INTO Persons (ID, LastName, FirstName, City) VALUES
('1', 'Roberts','John','London');
INSERT INTO Persons (ID, LastName, FirstName, City) VALUES
('2', 'Smith','Bob','Washington');

logstash config file:

input {
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/"
jdbc_user => "root"
jdbc_password => "changeme"
jdbc_driver_library => "/Users/julien/logstashcfg/mysql-connector-java-5.1.44/mysql-connector-java-5.1.44-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
statement => "SELECT * from TEST.Persons"
#Read of the entire table every second (for test purpose only as this is bad for performance - will use delta selection in prod)
schedule => "* * * * * *"
lowercase_column_names => false
}
}
output {
elasticsearch {
index => "test_mysql"
user => "elastic"
password => "changeme"
document_id => "%{ID}"
document_type => "%{City}"
}
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.