Problem with Aggregate Filter

I am trying to aggregate all of the log entries for database deadlocks into a single entry. Right now I'm focusing on MySQL's log output, but I need to so something very similar for MSSQL.

In comments I have the first line of 7 log events. I am using filebeat to ship these to my logstash instance. I have confirmed that the GROK is working correctly. I have removed the pipeline from the mysql module for filebeat and replicated it here in my logstash pipeline config.

What I am attempting to do is detect that there was a deadlock, and then capture any additional log entries that have the same mysql.thread_id for up to 5 seconds.

I have been fighting with the following config for too long and could use some help. I see no aggregate events, and I can see from the logstash debug logs that the aggregate context map entries are always zero.

I have my pipline configured to only use one worker:

 - pipeline.id: mysql
   pipeline.workers: 1 # Only one because of aggregation
   path.config: "/etc/logstash/config/pipelines/mysql.config"

I am not a Ruby/Logstash syntax expert so I'm hoping my problem is as simple as that.

Sample Log:

2020-01-27T17:20:35.463747Z 54 [Note] InnoDB: Transactions deadlock detected, dumping detailed information.
2020-01-27T17:20:35.464942Z 54 [Note] InnoDB:
2020-01-27T17:20:35.465266Z 54 [Note] InnoDB: *** (1) WAITING FOR THIS LOCK TO BE GRANTED:
2020-01-27T17:20:35.469270Z 54 [Note] InnoDB: *** (2) TRANSACTION:
2020-01-27T17:20:35.469384Z 54 [Note] InnoDB: *** (2) HOLDS THE LOCK(S):
2020-01-27T17:20:35.471165Z 54 [Note] InnoDB: *** (2) WAITING FOR THIS LOCK TO BE GRANTED:
2020-01-27T17:20:35.473132Z 54 [Note] InnoDB: *** WE ROLL BACK TRANSACTION (1)

mysql.config

input {
  beats {
    port => 5044
  }
}

filter {
  grok {
    match => {
      "message" => [
        "%{DATA:mysql.timestamp} %{NUMBER:mysql.thread_id:long} \[%{DATA:log.level}\] %{GREEDYDATA:mysql.message}",
        "%{LOCALDATETIME:mysql.timestamp} (\\[%{DATA:log.level}\\] )?%{GREEDYDATA:mysql.message}",
        "%{GREEDYDATA:mysql.message}"
      ]
    }
    pattern_definitions => {
      "LOCALDATETIME" => "[0-9]+ %{TIME}"
    }
  }

  date {
    match => ["mysql.timestamp", "ISO8601", "yyMMdd H:m:s"]
  }

  mutate {
    copy => {
      "@timestamp" => "event.created"
    }
  }

  # Start of a deadlock, we don't really know if the last log entry will
  # be in order so we just rely on a timeout
  if [mysql][message] =~ /Transactions deadlock detected/ {
    aggregate {
      task_id => "%{[mysql][thread_id]}"
      add_tag => ["deadlock"]
      code => "
        map['log'] = event.get('mysql.message') + '\n';
        map['log.level'] = 'ERROR';
        map['thread'] = %{[mysql][thread_id]}
      "
      map_action => "create"

      timeout => 5
      timeout_timestamp_field => "@timestamp"
      push_map_as_event_on_timeout => true
    }
  }

  aggregate {
    task_id => "%{[mysql][thread_id]}"
    code => "map['deadlock'] += event.get('mysql.message') + '\n'"
    # Only aggregates if there was a recent deadlock for this same threadId
    map_action => "update"
  }

}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  }
}

That refers to a thread_id object nested with the mysql. Your grok created a field with a period in it -- mysql.thread_id.

I would definitely look at doing multiline processing in filebeat. You are going to run into problems doing the aggregate in logstash.

If you persist in using aggregate you must also disable pipeline.java_execution.

Thanks, I tried this as well and it didn't work either

task_id => "${mysql.thread_id}"

I am using multiline processing with filebeat to capture multiple lines per log statement. But can that help me gather up log entries over time that can be intermixed from various threads?

Obviously I'm running into problems :slight_smile: What OTHER problems are you referring to?

Thanks for the tip about java execution.

You were really close. The following produces

    "thread" => "54",
"@timestamp" => 2020-01-29T00:57:05.486Z,
 "log.level" => "ERROR",
       "log" => "InnoDB: Transactions deadlock detected, dumping detailed information.\n",
  "@version" => "1",
  "deadlock" => "InnoDB:\nInnoDB: *** (1) WAITING FOR THIS LOCK TO BE GRANTED:\nInnoDB: *** (2) TRANSACTION:\nInnoDB: *** (2) HOLDS THE LOCK(S):\nInnoDB: *** (2) WAITING FOR THIS LOCK TO BE GRANTED:\nInnoDB: *** WE ROLL BACK TRANSACTION (1)\n"

  if [mysql.message] =~ /Transactions deadlock detected/ {
    aggregate {
      task_id => "%{[mysql.thread_id]}"
      add_tag => ["deadlock"]
      code => "
        map['log'] = event.get('mysql.message') + '\n';
        map['log.level'] = 'ERROR';
        map['thread'] = event.get('mysql.thread_id')
        map['deadlock'] = ''
        event.cancel
      "
      map_action => "create"

      timeout => 5
      timeout_timestamp_field => "@timestamp"
      push_map_as_event_on_timeout => true
    }
  }

  aggregate {
    task_id => "%{[mysql.thread_id]}"
    code => "
        map['deadlock'] += event.get('mysql.message') + '\n'
        event.cancel
    "
    # Only aggregates if there was a recent deadlock for this same threadId
    map_action => "update"
  }

Note that I added the event.cancel calls so that you do not get the individual lines, and you cannot use a sprintf reference in a ruby filter. The big change was the

map['deadlock'] = ''

in the first aggregate. You could alternatively have fixed that using

map['deadlock'] ||= ''

in the second aggregate. In fact doing it in the second filter would probably be better, since the ||= (initialize if nil) is something any aggregate user is likely to be familiar with.

My comment about hitting problems was not helpful. You can make this work in aggregate, but whilst I enjoy finding innovative solutions using aggregate they are limited in throughput (being single threaded) and they can be fragile (the first time filebeats on two different servers send you data using the same thread_id you will see this). And that may sound improbable, but the reality is that it takes two to deadlock, so if you have filebeats on both servers, and they both have low thread numbers, which is common in low volume J2EE servers, it may happen.

Note that in my solution the add_tag in the first aggregate is a no-op, since I end up deleting the event. When you use push_map_as_event_on_timeout you only get the things you added to the map. You could use

    map['tags'] = ['deadlock']

to add an array with a single member.

Finally, I sometimes want to use string interpolation in ruby code, and that only works in double quotes. So I always use single quotes to surround the code in both aggregate and ruby filters

code => '
    map["log"] = event.get("mysql.message") + "\n";
    ...
'
1 Like

Thank you, this is super helpful.

I will look into possibly doing this within the filebeat as well, I agree that would be ideal.

Right now I'm happy to just have a solution. To address the possibility of multiple events with the same threadId, could I not also include a unique agent identifier from the event/metadata in the task_id?

Yes, you would just need additional fields added to the task_id.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.