Aggregate multiple in logstash

Hello
I am having trouble merging data from different SQL data tables.
I can see that the data is saved in ELK as the different SQL entries are made but when arriving at the aggregation filter, it renames the fields well but the data is redundant instead of being appended in the same event.
I show you my case below:

Here is an entry of the same orchestration as all the others I want to integrate :


input {
  jdbc {         
  jdbc_connection_string => "xxxxxx"      
  jdbc_user => "xxxxxx"
  jdbc_password => "xxxxxx"          
  jdbc_driver_library => "/usr/share/logstash/tools/psql-connector-java/postgresql-42.3.1.jar"    
  jdbc_driver_class => "org.postgresql.Driver"
  statement_filepath => "/etc/logstash/conf.d/pipeline/xxxxxx.sql"
  use_column_value => true
  tracking_column => "dernier_acces_utilisateur_unix"
  tracking_column_type => "numeric"
  last_run_metadata_path  => "/usr/share/logstash/counter/.logstash-jdbc-lastrun-01-xxxxxx"
  schedule => "*/10 * * * * "
  }
}

filter{
  date {
    match => [ "modification_role_unix", "UNIX" ]
    target => "modification_role"
  }
  date {
    match => [ "premier_acces_utilisateur_unix", "UNIX" ]
    target => "premier_acces_utilisateur"
  }
  date {
    match => [ "dernier_acces_utilisateur_unix", "UNIX" ]
    target => "dernier_acces_utilisateur"
  }
  date {
    match => [ "dernier_connexion_utilisateur_unix", "UNIX" ]
    target => "dernier_connexion_utilisateur"
  }
  date {
    match => [ "actuelle_connexion_utilisateur_unix", "UNIX" ]
    target => "actuelle_connexion_utilisateur"
  }
  date {
    match => [ "creation_compte_utilisateur_unix", "UNIX" ]
    target => "creation_compte_utilisateur"
  }
  date {
    match => [ "modification_compte_utilisateur_unix", "UNIX" ]
    target => "modification_compte_utilisateur"
  }
  mutate {
    remove_field => ["premier_acces_utilisateur_unix", "dernier_acces_utilisateur_unix",  "dernier_connexion_utilisateur_unix", "actuelle_connexion_utilisateur_unix", "creation_compte_utilisateur_unix",  "modification_compte_utilisateur_unix"]
  }
  if [validation_rgpd_utilisateur] == 0 {
    mutate {
      replace => { "validation_rgpd_utilisateur" => "Non" }
    } 
  } else {
    mutate {
      replace => { "validation_rgpd_utilisateur" => "Oui" }
    } 
  }
}

output {
  if ([email_utilisateur] =~ /^[a-zA-Z0-9._-]+@[a-zA-Z0-9._-]{2,}\.[a-z]{2,4}$/){
   pipeline {
      send_to => "filter-merge-output"
    }
  }
}

After that here is my filter that allows to merge and aggregate :

input {
   pipeline {
    address => "filter-merge-output"
  }
}

filter {
  aggregate {
    task_id => "%{id_utilisateur}"
    code => "
            map['utilisateur'] = {
            'id' => event.get('id_utilisateur'),
            'nom' => event.get('nom_utilisateur'),
            'prenom' => event.get('prenom_utilisateur'),
            'email' => event.get('email_utilisateur'),
            'dernierIp' => event.get('derniere_ip_utilisateur'),
            'validationRgpd' => event.get('validation_rgpd_utilisateur'),
            'premierAcces' => event.get('premier_acces_utilisateur'),
            'dernierAcces' => event.get('dernier_acces_utilisateur'),
            'derniereConnexion' => event.get('dernier_connexion_utilisateur'),
            'connexionActuelle' => event.get('actuelle_connexion_utilisateur'),
            'creationCompte' => event.get('creation_compte_utilisateur'),
            'modificationCompte' => event.get('modification_compte_utilisateur')
            }
            map['utilisateur']['liaisonTrax'] = {
                'idTrax' => event.get('id_utilisateur'),
                'email' => event.get('email_trax_utilisateur'),
                'uuid' => event.get('uuid_trax_utilisateur')
            }
            map['utilisateur']['role'] = {
              'id' => event.get('id_role'),
              'nom' => event.get('nom_role'),
              'modification' => event.get('modification_role')
            }
          event.cancel();
          "
    push_map_as_event_on_timeout => true 
    timeout => 300
    timeout_tags => [ '_aggregatetimeout' ] 
}

output {
      elasticsearch {
            hosts => ["xxxxxxx"]
            ssl => true
            cacert => "xxxxxx"
            user => "xxxxx"
            password => "xxxxx"
            ilm_rollover_alias => "xxxxx"
            ilm_pattern => "xxxx"
            ilm_policy => "xxxxxxx"
    }
}

And finally the pipeline.yml :

#INPUT
- pipeline.id: "xxxxx-input-utilisateur"
  path.config: "/etc/logstash/conf.d/pipeline/input/xxxxx.conf"

- pipeline.id: "xxxxx-input-xxxxx-utilisateur"
  path.config: "/etc/logstash/conf.d/pipeline/input/xxxxx.conf"

- pipeline.id: "xxxxx-input-utilisateur-role"
  path.config: "/etc/logstash/conf.d/pipeline/input/xxxxx.conf"

# FILTER & OUTPUT
- pipeline.id: "filter-merge-output"
  path.config: "/etc/logstash/conf.d/pipeline/filter-output/filter-merge-output.conf"

So the data ingested into the application is represented as follows:

 "utilisateur": {

      "role": {

        "id": null,

        "modification": null,

        "nom": null

      },

      "validationRgpd": "Oui",

      "liaisonTrax": {

        "email": null,

        "uuid": null,

        "idTrax": 349

      },

      "modificationCompte": "2021-10-08T17:48:33.000Z",

      "derniereConnexion": "2021-11-07T17:32:16.000Z",

      "id": 349,

      "creationCompte": "2021-10-08T17:48:33.000Z",

      "dernierAcces": "2021-11-15T10:48:57.000Z",

      "connexionActuelle": "2021-11-15T10:47:33.000Z",

      "nom": "xxxxxx",

      "prenom": "xxxxx",

      "dernierIp": "xxxxxxx",

      "premierAcces": "2021-10-10T20:04:15.000Z",

      "email": "xxxxxxx@xxxxx.fr"
}

Or

 "utilisateur": {

      "role": {

        "id": 349,

        "modification":  "2021-10-08T17:48:33.000Z",

        "nom": "etudiant"

      },

      "validationRgpd": null,

      "liaisonTrax": {

        "email": null,

        "uuid": null,

        "idTrax": 349

      },

      "modificationCompte": null,

      "derniereConnexion": null,

      "id": null,

      "creationCompte": null,

      "dernierAcces": null,

      "connexionActuelle": null,

      "nom": null,

      "prenom": null,

      "dernierIp": null,

      "premierAcces": null,

      "email": null

What i want :

"utilisateur": {

      "role": {

        "id": 349,

        "modification": "2021-10-08T17:48:33.000Z",

        "nom": "xxxxxx"

      },

      "validationRgpd": "Oui",

      "liaisonTrax": {

        "email": "xxxxxxx@xxxxx.fr",

        "uuid": "a72965aa-494c-4367-ba60-63a1d56b508c" ,

        "idTrax": 349

      },

      "modificationCompte": "2021-10-08T17:48:33.000Z",

      "derniereConnexion": "2021-11-07T17:32:16.000Z",

      "id": 349,

      "creationCompte": "2021-10-08T17:48:33.000Z",

      "dernierAcces": "2021-11-15T10:48:57.000Z",

      "connexionActuelle": "2021-11-15T10:47:33.000Z",

      "nom": "xxxxxx",

      "prenom": "xxxxx",

      "dernierIp": "xxxxxxx",

      "premierAcces": "2021-10-10T20:04:15.000Z",

      "email": "xxxxxxx@xxxxx.fr"
}

Thank you in advance for the help;
Kind regards

You need to set pipeline.workers to 1 in your pipelines, the one that is using the jdbc input and also in every pipeline that has an aggregate filter.

Have you tried that?

1 Like

You are unconditionally assigning values into [map], even if they are null. Make the assignments conditional

code => "
    map['utilisateur'] ||= {}
    map['utilisateur']['liaisonTrax'] ||= {}
    map['utilisateur']['role'] ||= {}
    map['utilisateur']['id'] ||= event.get('id_utilisateur')
    map['utilisateur']['nom'] ||= event.get('nom_utilisateur')
    ...
    map['utilisateur']['liaisonTrax']['idTrax'] ||= event.get('id_utilisateur'),
    map['utilisateur']['liaisonTrax']['email'] ||= event.get('email_trax_utilisateur')
    ...

Hello Badger and leandrojmp;
Your interventions solve my problem of merge.
Thank you very much.
Kind regards;

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.