Je suis entrain de mettre en place une configuration Logstash pour lire les données dans une base de données et écrire dans un ElasticSearch.
J'ai géré ma configuration par plusieurs fichiers .conf dans un seul répertoire et je lance Logstash en pointant sur ce répertoire.
J'ai un problème dans un index dans ElasticSearch : j'obtiens bien mes documents du type que j'attends, mais j'obtiens aussi des documents qui ne devraient pas être dans ce type.
En réalité, j'obtiens un document de chaque type configuré dans ce type... Je ne comprends pas.
Je ne trouve pas de solutions pour filtrer les données dans l'output, est-ce que vous auriez une solution ou une idée ?
Est-ce que le problème est reproductible avec un seul fichier config. Ensuite pouvez-vous joindre le fichier config?
Une note, l'utilisation de plusieurs types dans un index sera plus possible dans le future et est a éviter, un conseil ici serait de n'utiliser qu'un type par index
Si je mets mes deux configurations dans un seul fichier .conf, j'ai toujours le même problème.
Mes configurations sont toutes équivalentes à celle ci-dessous :
input {
jdbc {
# Chaine de connexion à la BDD
jdbc_connection_string => "jdbc:mysql://localhost:3306/bdd"
# User et password de la BDD
jdbc_user => "userbdd"
jdbc_password => "pwdbdd"
# Chemin du fichier JDBC driver
jdbc_driver_library => "/home/logstash/config/mysql-connector-java-5.1.43-bin.jar"
# Le nom du driver JDBC
jdbc_driver_class => "com.mysql.jdbc.Driver"
# La requête à executer dans la BDD
statement => "SELECT
'NomDuType' as docIndex,
..."
#Lecture dans la base toutes les secondes
schedule => "* * * * * *"
#On veut garder les fields dans ES en case sensitive
lowercase_column_names => false
}
}
filter {
if [docIndex] == 'NomDuType' {
if [archived] == 1 {
ruby {
code => "event.set('archived', TRUE)"
}
} else {
ruby {
code => "event.set('archived', FALSE)"
}
}
if [sagaDeleted] == 1 {
mutate {
add_field => {
"[@metadata][elasticsearch_action]" => "delete"
}
}
mutate {
remove_field => [ "docIndex", "sagaDeleted" ]
}
} else {
mutate {
add_field => {
"[@metadata][elasticsearch_action]" => "index"
}
}
mutate {
remove_field => [ "docIndex", "sagaDeleted" ]
}
}
}
}
#Données à écrire
output {
#Configuration de l'écriture des logs (stdout, file...)
stdout { codec => json_lines }
#Enregistrement dans elasticsearch
elasticsearch {
# Serveur ES
"hosts" => "localhost:9200"
# Nom de l'index dans lequel les données sont enregistrées
"index" => "test-index"
# Action à effectuer (index ou delete)
"action" => "%{[@metadata][elasticsearch_action]}"
# Nom du type dans lequel les données sont enregistrées
"document_type" => "NomDuType"
# Id du document (pour ne pas réindexer les documents à chaque requête)
"document_id" => "%{tableId}"
}
}
Je ne vois pas de filtre (where clause) sur un champs qui sépare les données a insérer dans l'exemple?
Aussi docIndex semble inutile car c'est un champs avec une value fixe ('NomDuType') dans l'input donc la condition dans le filtre sur le champs docIndex sera toujours vraix
Dans la partie Input por le scenario, nous devrions voir quelque chose comme ca - et le meme select statement peut être testé avec le client mysql pour verifier que seulement les enregistrements voulu sont retournés:
Select list, of, fields from table_name where some_column = "value_used_to_separate_data"
Dans ce cas la, dans la partie output de elasticsearch :
"document_type" => "NomDuType" est incorrect,
"document_type" => "%{docIndex}" devrait accomplir ce que vous cherchez a faire - ceci utilisera la valeur du champs docIndex retourné par le SELECT (qui si je comprends bien sera de valeur NomDuType dans le cas donné et surement une autre valeur pour un autre select a dirige vers un autre type)
Comme dit initialement le mieux serait de faire la separation sur index et non pas document_type afin que chaque index est un seul type.
J'ai teste ca en 5.5.2 avec mysql, ceci envoie les 2 documents dans 2 types en utilisant un champs de la table:
Mysql statements for reference
DROP DATABASE TEST;
CREATE DATABASE TEST;
USE TEST;
CREATE TABLE Persons (ID varchar(30),
LastName varchar(255),
FirstName varchar(255),
City varchar(255)
);
INSERT INTO Persons (ID, LastName, FirstName, City) VALUES
('1', 'Roberts','John','London');
INSERT INTO Persons (ID, LastName, FirstName, City) VALUES
('2', 'Smith','Bob','Washington');
logstash config file:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/"
jdbc_user => "root"
jdbc_password => "changeme"
jdbc_driver_library => "/Users/julien/logstashcfg/mysql-connector-java-5.1.44/mysql-connector-java-5.1.44-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
statement => "SELECT * from TEST.Persons" #Read of the entire table every second (for test purpose only as this is bad for performance - will use delta selection in prod)
schedule => "* * * * * *"
lowercase_column_names => false
}
}
output {
elasticsearch {
index => "test_mysql"
user => "elastic"
password => "changeme"
document_id => "%{ID}"
document_type => "%{City}"
}
}
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.