Logstash aggregate filter not working for GCP MySQL slow query logs

Hi Team,

I'm trying to send Google Cloud MySQL slow query logs from GCP Cloud Logging to elastic through Logstash pubsub input.

The Slow query logs are in the JSON format and Pubsub has a limitation - it does not maintain order when the events are forwarded to the subscribers(Logstash).

Hence, I'm aggregating them based on a receiveTimestamp but no luck.

The source JSON events looks something like this.

{
	"@timestamp":"2022-03-29T04:00:24.177Z",
	"message": "original_event_3",
	"textPayload": "# Query_time: 10.183602  Lock_time: 0.000386 Rows_sent: 823492  Rows_examined: 825476",
	"tags": ["cloudsql-slow-log"],
	"receiveTimestamp": "2022-03-29T04:00:36.305655815Z",
	"cloud.project.id": "ABC"
}
{
	"@timestamp":"2022-03-29T04:00:30.177Z",
	"message": "original_event_1",
	"textPayload": "# Time: 2022-03-29T08:37:50.228345Z"
	"tags": ["cloudsql-slow-log"]
	"receiveTimestamp": "2022-03-29T04:00:36.305655815Z"
	"cloud.project.id": "ABC"
}
{
	"@timestamp":"2022-03-29T04:00:40.177Z",
	"message": "original_event_6",
	"textPayload": "FROM reporting.v_v3_profile_identities"
	"tags": ["cloudsql-slow-log"]
	"receiveTimestamp": "2022-03-29T04:00:36.305655815Z"
	"cloud.project.id": "ABC"
}
{
	"@timestamp":"2022-03-29T04:00:55.177Z",
	"message": "original_event_4",
	"textPayload": "SET timestamp=1648543070;"
	"tags": ["cloudsql-slow-log"]
	"receiveTimestamp": "2022-03-29T04:00:36.305655815Z"
	"cloud.project.id": "ABC"
}
.......

Desired output is


The output should look something like below:


{
	"@timestamp":"2022-03-29T04:00:30.177Z",
	"textPayload": "# Time: 2022-03-29T08:37:50.228345Z
					# User@Host: mule_transaction_report[mule_transaction_report] @  [194.255.15.66]  thread_id: 666318  server_id: 1912109277
					# Query_time: 10.183602  Lock_time: 0.000386 Rows_sent: 823492  Rows_examined: 825476
					SET timestamp=1648543070;
					SELECT identifier as GCNUMBER
					FROM reporting.v_v3_profile_identities 
					where handle ='card_pos' 
					and identifier >= '80029914'
					and identifier <= '83539938'
					order by identifier asc;"
	"tags": ["cloudsql-slow-log"]
	"receiveTimestamp": "2022-03-29T04:00:36.305655815Z"
	"cloud.project.id": "ABC"
}

You need to share your logstash configuration with your aggregate filter.

But the aggregate filter will aggregate events based on a unique id in the order the filter receives the events, if your events are already out of order before entering the logstash pipeline, the aggregate filter won't fix that.

You could try

    aggregate {
        task_id => "%{receiveTimestamp}"
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "receiveTimestamp"
        timeout => 3
        code => '
            map["textPayload"] ||= []
            map["@timestamp"] ||= event.get("@timestamp")
            map["tags"] ||= event.get("tags")
            map["cloud.project.id"] ||= event.get("cloud.project.id")
            i = event.get("message").sub(/\D+/, "").to_i
            map["textPayload"][i-1] = event.get("textPayload")
            event.cancel
        '
        timeout_code => '
            # This code operates on the generated event,
            # not on the map from which it is generated.
            event.set("textPayload", event.get("textPayload").join("\n"))
        '
    }

which for the 4 events you show (once they are fixed to be valid JSON) will result in

{
      "@timestamp" => 2022-03-29T04:00:24.177Z,
            "tags" => [ "cloudsql-slow-log" ],
"receiveTimestamp" => "2022-03-29T04:00:36.305655815Z",
"cloud.project.id" => "ABC",
     "textPayload" => "# Time: 2022-03-29T08:37:50.228345Z\n\n# Query_time: 10.183602  Lock_time: 0.000386 Rows_sent: 823492  Rows_examined: 825476\nSET timestamp=1648543070;\n\nFROM reporting.v_v3_profile_identities"
}
2 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.