Création d'un id elasticsearch d'elements transféré dans logstash

Bonjour,

Je suis entrain de monter en compétence sur ELK. je voudrais effectuer un traitement de log .
voici le contexte : j'ai deux entrées contenant deux log (chaque log contient des centaines de lignes)
ce que je souhaite faire pour là première log: log1
1- ajouter un champ vide dans la chaqu'un des messages de la log1
2- calculer un id pour chaque message de log (ce id sera utilise pour remplir le champ vide précédemment créé.
3- envoi du message parsé contenant les deux champs créés (un vide et le deuxième contenant l'id du message) à Elasticsearh.

Avec la deuxième log : log2,
je vais récupérer un champ et mettre à jour le message de la log1 déjà envoyé dans base elasticsearch.
Pour cela il me faut un id pour retrouver le message à mettre à jour.
D'où le calcul de la dans la log1.

Je cherche des pistes pour avancer sur le sujet .
ce qui bloque c'est l'interaction entre logstash et elasticsearch sur la parti calcul de l'id du message.

Est-ce possible de faire des calcules de ce type?
Avez vous des pistes à me donner
Merci par avance

Peut-être que le filtre fait par @fbaligand pourrait t'aider en fait à obtenir le résultat final que tu souhaites? https://www.elastic.co/blog/introducing-community-maintainers-for-logstash-plugins

Doc ici: https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html

Bonjour @ovs,

J'ai quelques questions pour bien comprendre ton contexte :

  • quand tu dis "log1" et "log2", tu veux dire que tu as deux fichiers de log en entrée de logstash ?
  • est-ce que tu parses d'abord tous les messages de log1, puis tous les messages de log2 ?
  • ou est-ce qu'au contraire, en terme d'ordre d'arrivée dans logstash, tu as un message de log1 qui arrive, puis le message associé de log2 ?
  • qu'est-ce qui te permet de corréler un message de log1 et un message de log2 ? Un identifiant de corrélation ?

Sinon, pour info, le plugin logstash "aggregate" est un filtre qui te permet d'aggéger des informations qui se trouvent dans plusieurs lignes d'une même tâche. Une tâche pouvant être une requête http, une exécution de batch, un nom de fichier, une session utilisateur, bref ce qui t'arrange.
Donc si pour une tâche donnée, tu as plusieurs messages provenant de log1 et de log2 avec un identifiant permettant de corréler la tâche, et que tu as un message estampillant la fin de la tâche dans lequel tu peux pousser toutes les informations que tu aggrégé des précédentes lignes de log de la tâche, alors oui, le plugin aggregate peut répondre à ton besoin.

  • quand tu dis "log1" et "log2", tu veux dire que tu as deux fichiers de log en entrée de logstash ?
    Oui il y'a deux inputs différents correspondant a chacune des log : log1 et log2

  • est-ce que tu parses d'abord tous les messages de log1, puis tous les messages de log2 ?
    oui je parses dans l'ordre les données de la log1 puis celles de la log2.
    pour les données de la log1, je fais le parsing puis j'ajoute un champs vide et j'envoie dans la base ElasticSearch
    Pour les données de la log2, je dois récupérer la valeur d'un champ pour mettre à jour chanps vide présent dans la base Elasticsearch et précédemment créé dans la log1.

  • ou est-ce qu'au contraire, en terme d'ordre d'arrivée dans logstash, tu as un message de log1 qui arrive, puis le message associé de log2 ? le message dans la log2 peux ne pas contenir d'informations associés à la log 1 dans ce cas le nouveau champ ajouté dans la log1 reste vide.

  • qu'est-ce qui te permet de corréler un message de log1 et un message de log2 ? Un identifiant de corrélation ? oui il une corrélation entre un message de log1 et un message de log2.

Je pensais a utiliser le filtre cksum commun du coup je calculerai le cksum de ce dénominateur commun et l'utiliserai comme _id dans Elasticsearch.
dans ce cas je mettrai a jour dans Elasticserach le champ vide du message de la log1 ayant le même id que le message de la log2

au passage dans le fichier de conf de logstash, dan sla partie OUTPUT, je peux mettre des condition : if , if else?

Merci pour votre réponse

Bonjour,

Tout d'abord, pour répondre à ta dernière question : oui, tu peux mettre des if/else dans les outputs logstash.
Par exemple, tu peux faire if [monChampInteressant] { elasticsearch {...} }

Pour ce qui est de l'utilisation du checksum comme id elasticsearch, je ne suis pas sûr que ce soit une bonne idée : si dans le temps, deux messages ont le même contenu servant à faire le checksum, ils aurant le même checksum, et donc le même id dans elasticsearch, ce qui fait que le dernier PUT de document vers elasticsearch écrasera l'ancien document.
Il vaut mieux laisser elasticsearch générer son propre identifiant, et stocker dans un champ à part, ton (ou tes) identifiant(s) fonctionnel(s).
Par ailleurs, un checksum ou l'identifiant qui te sert à faire le checksum revient au même.

Ensuite, si je comprends bien, tu as toujours un message de log1 puis un message de log2 et au final, un seul document dans elasticsearch aggrégeant les données de "message log1" et de "message log2".
Et tu as dans le message log1 et message log2, un champ contenant un identifiant permettant de corréler les 2 messages.
C'est bien ça ?

Si tel est le cas, je pense que stocker un premier document dans elasticsearch, pour le mettre à jour plus tard, est quelque peu accrobatique et c'est pas gagné que ça marche.

Par contre, je pense que le plugin aggregate peut résoudre ton problème.
Dans l'idée :

  • à l'arrivée du message log1, via le plugin aggregate, tu crées une map associée à ton identifiant de corrélation, avec tous les champs qui t'intéressent pour elasticsearch
  • à l'arrivée du message log2, via le plugin aggregate, tu copies les infos collectées dans la map aggrégée vers le "message log2", et ensuite tu envoies le document "message log2" (contenant toutes les infos de log1 et log2) vers elasticsearch.

Pour être concret, ça devrait donner qqchose comme ça en configuration logstash :

filter {

  if [log1] {
    aggregate {
      task_id => "%{champ_de_correlation_entre_log1_et_log2}"
      code => "map.merge!(event.to_hash)"   # copie tous les champs de l'évènement vers la map aggrégée
    }
  }

  if [log2] {
    aggregate {
      task_id => "%{champ_de_correlation_entre_log1_et_log2}"
      code => "event.to_hash.merge!(map)"   # copie tous les champs de la map aggrégée vers l'évènement 
      end_of_task => true # supprime la map aggrégée associée
    }
  }
} 

output {
  if [log2] {
      elasticsearch {
        ...
      }
  }
}

Bonjour fbaligand

Oui c'est bien le traitement que je souhaite et en plus du document agrégeant les deux logs, je voudrais conserver la log2 pour un autre traitement. Plus tard je vais réfléchir à une solution d'optimisation afin de ne pas avoir des informations en doublons dans la base . l'idéale serai d'intégrer cette optimisation dés maintenant mais je début avec cette solution, je ne sais pas si j'y arriverai.

Merci pour votre aide
Très cordialement

Si j'ai pu t'aider, j'en suis ravi.
Du coup, vas-tu utiliser la solution que je t'ai proposé ?
Si c'est le cas, dis-moi si ça marche et répond bien à ton besoin.

Par ailleurs, pour éviter les doublons, une fois que tu as "copié" les infos de log1 vers la map aggregate, tu peux éventuellement supprimer le message "log1".
Ça te permet de traiter un seul document synthèse pour tes différents besoins.