Hello
I am having trouble merging data from different SQL data tables.
I can see that the data is saved in ELK as the different SQL entries are made but when arriving at the aggregation filter, it renames the fields well but the data is redundant instead of being appended in the same event.
I show you my case below:
Here is an entry of the same orchestration as all the others I want to integrate :
input {
jdbc {
jdbc_connection_string => "xxxxxx"
jdbc_user => "xxxxxx"
jdbc_password => "xxxxxx"
jdbc_driver_library => "/usr/share/logstash/tools/psql-connector-java/postgresql-42.3.1.jar"
jdbc_driver_class => "org.postgresql.Driver"
statement_filepath => "/etc/logstash/conf.d/pipeline/xxxxxx.sql"
use_column_value => true
tracking_column => "dernier_acces_utilisateur_unix"
tracking_column_type => "numeric"
last_run_metadata_path => "/usr/share/logstash/counter/.logstash-jdbc-lastrun-01-xxxxxx"
schedule => "*/10 * * * * "
}
}
filter{
date {
match => [ "modification_role_unix", "UNIX" ]
target => "modification_role"
}
date {
match => [ "premier_acces_utilisateur_unix", "UNIX" ]
target => "premier_acces_utilisateur"
}
date {
match => [ "dernier_acces_utilisateur_unix", "UNIX" ]
target => "dernier_acces_utilisateur"
}
date {
match => [ "dernier_connexion_utilisateur_unix", "UNIX" ]
target => "dernier_connexion_utilisateur"
}
date {
match => [ "actuelle_connexion_utilisateur_unix", "UNIX" ]
target => "actuelle_connexion_utilisateur"
}
date {
match => [ "creation_compte_utilisateur_unix", "UNIX" ]
target => "creation_compte_utilisateur"
}
date {
match => [ "modification_compte_utilisateur_unix", "UNIX" ]
target => "modification_compte_utilisateur"
}
mutate {
remove_field => ["premier_acces_utilisateur_unix", "dernier_acces_utilisateur_unix", "dernier_connexion_utilisateur_unix", "actuelle_connexion_utilisateur_unix", "creation_compte_utilisateur_unix", "modification_compte_utilisateur_unix"]
}
if [validation_rgpd_utilisateur] == 0 {
mutate {
replace => { "validation_rgpd_utilisateur" => "Non" }
}
} else {
mutate {
replace => { "validation_rgpd_utilisateur" => "Oui" }
}
}
}
output {
if ([email_utilisateur] =~ /^[a-zA-Z0-9._-]+@[a-zA-Z0-9._-]{2,}\.[a-z]{2,4}$/){
pipeline {
send_to => "filter-merge-output"
}
}
}
After that here is my filter that allows to merge and aggregate :
input {
pipeline {
address => "filter-merge-output"
}
}
filter {
aggregate {
task_id => "%{id_utilisateur}"
code => "
map['utilisateur'] = {
'id' => event.get('id_utilisateur'),
'nom' => event.get('nom_utilisateur'),
'prenom' => event.get('prenom_utilisateur'),
'email' => event.get('email_utilisateur'),
'dernierIp' => event.get('derniere_ip_utilisateur'),
'validationRgpd' => event.get('validation_rgpd_utilisateur'),
'premierAcces' => event.get('premier_acces_utilisateur'),
'dernierAcces' => event.get('dernier_acces_utilisateur'),
'derniereConnexion' => event.get('dernier_connexion_utilisateur'),
'connexionActuelle' => event.get('actuelle_connexion_utilisateur'),
'creationCompte' => event.get('creation_compte_utilisateur'),
'modificationCompte' => event.get('modification_compte_utilisateur')
}
map['utilisateur']['liaisonTrax'] = {
'idTrax' => event.get('id_utilisateur'),
'email' => event.get('email_trax_utilisateur'),
'uuid' => event.get('uuid_trax_utilisateur')
}
map['utilisateur']['role'] = {
'id' => event.get('id_role'),
'nom' => event.get('nom_role'),
'modification' => event.get('modification_role')
}
event.cancel();
"
push_map_as_event_on_timeout => true
timeout => 300
timeout_tags => [ '_aggregatetimeout' ]
}
output {
elasticsearch {
hosts => ["xxxxxxx"]
ssl => true
cacert => "xxxxxx"
user => "xxxxx"
password => "xxxxx"
ilm_rollover_alias => "xxxxx"
ilm_pattern => "xxxx"
ilm_policy => "xxxxxxx"
}
}
And finally the pipeline.yml :
#INPUT
- pipeline.id: "xxxxx-input-utilisateur"
path.config: "/etc/logstash/conf.d/pipeline/input/xxxxx.conf"
- pipeline.id: "xxxxx-input-xxxxx-utilisateur"
path.config: "/etc/logstash/conf.d/pipeline/input/xxxxx.conf"
- pipeline.id: "xxxxx-input-utilisateur-role"
path.config: "/etc/logstash/conf.d/pipeline/input/xxxxx.conf"
# FILTER & OUTPUT
- pipeline.id: "filter-merge-output"
path.config: "/etc/logstash/conf.d/pipeline/filter-output/filter-merge-output.conf"
So the data ingested into the application is represented as follows:
"utilisateur": {
"role": {
"id": null,
"modification": null,
"nom": null
},
"validationRgpd": "Oui",
"liaisonTrax": {
"email": null,
"uuid": null,
"idTrax": 349
},
"modificationCompte": "2021-10-08T17:48:33.000Z",
"derniereConnexion": "2021-11-07T17:32:16.000Z",
"id": 349,
"creationCompte": "2021-10-08T17:48:33.000Z",
"dernierAcces": "2021-11-15T10:48:57.000Z",
"connexionActuelle": "2021-11-15T10:47:33.000Z",
"nom": "xxxxxx",
"prenom": "xxxxx",
"dernierIp": "xxxxxxx",
"premierAcces": "2021-10-10T20:04:15.000Z",
"email": "xxxxxxx@xxxxx.fr"
}
Or
"utilisateur": {
"role": {
"id": 349,
"modification": "2021-10-08T17:48:33.000Z",
"nom": "etudiant"
},
"validationRgpd": null,
"liaisonTrax": {
"email": null,
"uuid": null,
"idTrax": 349
},
"modificationCompte": null,
"derniereConnexion": null,
"id": null,
"creationCompte": null,
"dernierAcces": null,
"connexionActuelle": null,
"nom": null,
"prenom": null,
"dernierIp": null,
"premierAcces": null,
"email": null
What i want :
"utilisateur": {
"role": {
"id": 349,
"modification": "2021-10-08T17:48:33.000Z",
"nom": "xxxxxx"
},
"validationRgpd": "Oui",
"liaisonTrax": {
"email": "xxxxxxx@xxxxx.fr",
"uuid": "a72965aa-494c-4367-ba60-63a1d56b508c" ,
"idTrax": 349
},
"modificationCompte": "2021-10-08T17:48:33.000Z",
"derniereConnexion": "2021-11-07T17:32:16.000Z",
"id": 349,
"creationCompte": "2021-10-08T17:48:33.000Z",
"dernierAcces": "2021-11-15T10:48:57.000Z",
"connexionActuelle": "2021-11-15T10:47:33.000Z",
"nom": "xxxxxx",
"prenom": "xxxxx",
"dernierIp": "xxxxxxx",
"premierAcces": "2021-10-10T20:04:15.000Z",
"email": "xxxxxxx@xxxxx.fr"
}
Thank you in advance for the help;
Kind regards