Bonjour,
Je souhaite indexer les logs Postgres qui sont stockés sur mes différents serveurs. Pour cela j'utilise Rsyslog pour envoyer les logs vers logstash. Les requêtes Postgres sont envoyés sur plusieurs lignes:
[35-1] 2017-06-09 11:10:18 CEST (548y65f2.b7e9 17) [local] user@postgres LOG: statement: SELECT field1, field2 field3
[35-2] #011 FROM table
[35-3] .......
[35-4] .......
J'ai aussi des logs sur une seul ligne du type:
[39-1] 2017-06-09 11:10:18 CEST (548y65f2.b7e9 21) [local] user@postgres LOG: duration: 1.451 ms
J’essaye de fusionner les logs via un aggregate filter mais les logs ne sont pas fusionnés comme il faut, la première ligne apparaît toujours 2 fois dans la ligne fusionné et parfois la première ligne apparaît à la fin de ma ligne.
Enfin, Logstash lit les logs au format json à partir d'un fichier dont le format est:
"path" => "/var/log/pg.log",
"@timestamp" => 2017-06-09T11:10:18.799Z,
"Ip" => "xxx.xxx.xxx.xxx",
"@version" => "1",
"Level" => "info",
"Host" => "PG",
"Pid" => "53943",
"Facility" => "local4",
"message" => " [39-1] 2017-06-09 11:10:18 CEST (548y65f2.b7e9 21) [local] user@postgres LOG: duration: 1.451 ms",
"Source" => "pg.PG"
Voici mon fichier de config:
input {
file {
path => [ "/var/log/pg.log" ]
codec => json
}
}
filter{
grok {
match => [ "Message", "\[%{INT:Line}-%{INT:Part_of_line}\] %{GREEDYDATA:temp}" ]
}
aggregate {
task_id => "%{Pid}%{Line}"
code => "
map.merge!(event) if map.empty?
map['Message'] ||= ''
map['Message'] += event.get('temp')
"
timeout => 10
push_map_as_event_on_timeout => true
timeout_code => "event.tag('aggregated')"
}
if "aggregated" not in [tags] {
drop {}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "logstash-%{+YYYY.MM.dd}"
template_name => "logevent"
}
stdout {
codec => rubydebug
}
}