I am trying to aggregate all of the log entries for database deadlocks into a single entry. Right now I'm focusing on MySQL's log output, but I need to so something very similar for MSSQL.
In comments I have the first line of 7 log events. I am using filebeat to ship these to my logstash instance. I have confirmed that the GROK is working correctly. I have removed the pipeline from the mysql module for filebeat and replicated it here in my logstash pipeline config.
What I am attempting to do is detect that there was a deadlock, and then capture any additional log entries that have the same mysql.thread_id for up to 5 seconds.
I have been fighting with the following config for too long and could use some help. I see no aggregate events, and I can see from the logstash debug logs that the aggregate context map entries are always zero.
I have my pipline configured to only use one worker:
- pipeline.id: mysql
pipeline.workers: 1 # Only one because of aggregation
path.config: "/etc/logstash/config/pipelines/mysql.config"
I am not a Ruby/Logstash syntax expert so I'm hoping my problem is as simple as that.
Sample Log:
2020-01-27T17:20:35.463747Z 54 [Note] InnoDB: Transactions deadlock detected, dumping detailed information.
2020-01-27T17:20:35.464942Z 54 [Note] InnoDB:
2020-01-27T17:20:35.465266Z 54 [Note] InnoDB: *** (1) WAITING FOR THIS LOCK TO BE GRANTED:
2020-01-27T17:20:35.469270Z 54 [Note] InnoDB: *** (2) TRANSACTION:
2020-01-27T17:20:35.469384Z 54 [Note] InnoDB: *** (2) HOLDS THE LOCK(S):
2020-01-27T17:20:35.471165Z 54 [Note] InnoDB: *** (2) WAITING FOR THIS LOCK TO BE GRANTED:
2020-01-27T17:20:35.473132Z 54 [Note] InnoDB: *** WE ROLL BACK TRANSACTION (1)
mysql.config
input {
beats {
port => 5044
}
}
filter {
grok {
match => {
"message" => [
"%{DATA:mysql.timestamp} %{NUMBER:mysql.thread_id:long} \[%{DATA:log.level}\] %{GREEDYDATA:mysql.message}",
"%{LOCALDATETIME:mysql.timestamp} (\\[%{DATA:log.level}\\] )?%{GREEDYDATA:mysql.message}",
"%{GREEDYDATA:mysql.message}"
]
}
pattern_definitions => {
"LOCALDATETIME" => "[0-9]+ %{TIME}"
}
}
date {
match => ["mysql.timestamp", "ISO8601", "yyMMdd H:m:s"]
}
mutate {
copy => {
"@timestamp" => "event.created"
}
}
# Start of a deadlock, we don't really know if the last log entry will
# be in order so we just rely on a timeout
if [mysql][message] =~ /Transactions deadlock detected/ {
aggregate {
task_id => "%{[mysql][thread_id]}"
add_tag => ["deadlock"]
code => "
map['log'] = event.get('mysql.message') + '\n';
map['log.level'] = 'ERROR';
map['thread'] = %{[mysql][thread_id]}
"
map_action => "create"
timeout => 5
timeout_timestamp_field => "@timestamp"
push_map_as_event_on_timeout => true
}
}
aggregate {
task_id => "%{[mysql][thread_id]}"
code => "map['deadlock'] += event.get('mysql.message') + '\n'"
# Only aggregates if there was a recent deadlock for this same threadId
map_action => "update"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}