Problème de merge de logs Postgres

Bonjour,

Je souhaite indexer les logs Postgres qui sont stockés sur mes différents serveurs. Pour cela j'utilise Rsyslog pour envoyer les logs vers logstash. Les requêtes Postgres sont envoyés sur plusieurs lignes:

[35-1] 2017-06-09 11:10:18 CEST (548y65f2.b7e9 17) [local] user@postgres LOG: statement: SELECT field1, field2 field3
[35-2] #011 FROM table
[35-3] .......
[35-4] .......

J'ai aussi des logs sur une seul ligne du type:

[39-1] 2017-06-09 11:10:18 CEST (548y65f2.b7e9 21) [local] user@postgres LOG: duration: 1.451 ms

J’essaye de fusionner les logs via un aggregate filter mais les logs ne sont pas fusionnés comme il faut, la première ligne apparaît toujours 2 fois dans la ligne fusionné et parfois la première ligne apparaît à la fin de ma ligne.

Enfin, Logstash lit les logs au format json à partir d'un fichier dont le format est:

"path" => "/var/log/pg.log",
"@timestamp" => 2017-06-09T11:10:18.799Z,
"Ip" => "xxx.xxx.xxx.xxx",
"@version" => "1",
"Level" => "info",
"Host" => "PG",
"Pid" => "53943",
"Facility" => "local4",
"message" => " [39-1] 2017-06-09 11:10:18 CEST (548y65f2.b7e9 21) [local] user@postgres LOG: duration: 1.451 ms",
"Source" => "pg.PG"

Voici mon fichier de config:

input {
        file {
                path => [ "/var/log/pg.log" ]
                codec => json
        }
}
filter{
        grok {
                match => [ "Message", "\[%{INT:Line}-%{INT:Part_of_line}\] %{GREEDYDATA:temp}" ]
        }
        aggregate {
                task_id => "%{Pid}%{Line}"
                code =>  "
                    map.merge!(event) if map.empty?
                 map['Message'] ||= ''
                 map['Message'] += event.get('temp')
                "
                timeout => 10
                push_map_as_event_on_timeout => true
                timeout_code => "event.tag('aggregated')"
        }

        if "aggregated" not in [tags] {
            drop {}
        }
}
output {
        elasticsearch {
                hosts => ["localhost:9200"]
                index => "logstash-%{+YYYY.MM.dd}"
                template_name => "logevent"
        }
        stdout {
                codec => rubydebug
        }
}

Peut-être que @fbaligand pourrait aider ici ?

Plusieurs remarques :

  • as-tu bien mis l'option -w 1 au démarrage de Logstash ? (pour s'assurer d'avoir un seul thread)
  • Je pense que tu as déjà l'attribut 'Message' dans ton évènement. Du coup, l'instruction map.merge!(event) if map.empty? positionne déjà le premier message. Et l'instruction map['Message'] += event.get('temp') le double.
    => je t'invite à stocker le résultat aggrégé dans un autre champ que Message.
    Après, via un filtre mutate, tu peux éventuellement copier ce nouveau champ vers Message puis supprimer ce nouveau champ.
1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.