Aggregate filter plugin

Your first problem is that none of your grok patterns actually match any of your log lines.

Note that \s is already a character class so you can use \s+ to match multiple spaces. No need to make it [\s]+

Next, if you look at these two patterns

  match => [ "message", "%{DATA:jcaption_id}\s+%{TIME:orario}[\s]+%{LOGLEVEL:log_level}[\s]+\[%{USERNAME:class}\][\s]+\[?%{USERNAME:correlation_id}][\s]+\[%{WORD:}][\s]+\[%{WORD:}][\s]+%{GREEDYDATA:sqlcount}" ]
  match => [ "message", "%{DATA:jcaption_id}\s+%{TIME:orario}[\s]+%{LOGLEVEL:log_level}[\s]+\[%{USERNAME:class}\][\s]+\[?%{USERNAME:correlation_id}][\s]+\[%{WORD:}][\s]+\[%{WORD:}][\s]+%{GREEDYDATA:sqlcount_param}" ]

they are identical. If the first one matches then grok will never try to second, and if the first does not match the second never will. I think the same is true of the two sqlfetch patterns, they will never match because if they were going to match then the sqlcount pattern would already have matched.

I suggest you take of the first (common) part of the log lines with one grok, then match rest of the line against a couple of patterns. Then decide what to do based on what fields get created.

Note that my patterns are anchored to start of line using ^. This makes things faster.

Note also the use of ([%{WORD}]\s+)? to consume the [RICERCA] that sometimes comes before the correlation id.

grok { match => { "message" => "^%{DATA:jcaption_id}\s+%{TIME:orario}\s+%{LOGLEVEL:log_level}\s+\[%{USERNAME:class}\]%{GREEDYDATA:[@metadata][restOfLine]}" } }
grok {
    match => {
        "[@metadata][restOfLine]" => [
            "^(\[%{WORD}\]\s+)?\[%{BASE10NUM:correlation_id}]\s+\[%{WORD:operation}\]\s+\[%{WORD:what}\]\s+%{GREEDYDATA:restOfLine}",
            "^(\[%{WORD}\]\s+)?\[%{BASE10NUM:correlation_id}]\s+\[%{WORD:operation}\]\s+%{GREEDYDATA:sqltime}"
        ]
    }
}
if [what] == "PARAM" {
    grok { match => { "restOfLine" => " %{WORD:key}$" } }
}
if [sqltime] {
    grok { match => { "sqltime" => "%{NUMBER:sqltime:float} seconds$" } overwrite => [ "sqltime" ] }
}
aggregate {
    task_id => "%{correlation_id}"
    code => '
        map["execution_time"] ||= 0
        executionTime = event.get("sqltime")
        if executionTime
            map["execution_time"] += executionTime
        end

        operation = event.get("operation")
        what = event.get("what")
        sql = event.get("restOfLine")
        key = event.get("key")

        if operation == "FETCH" and what == "SQL"
            map["sqlfetch"] = sql
        end
        if operation == "COUNT" and what == "SQL"
            map["sqlcount"] = sql
        end
        if key
            map["key"] = key
        end
    '
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "correlation_id"
    timeout => 5
}

You will need to expand this to add the other fields you want.