Your first problem is that none of your grok patterns actually match any of your log lines.
Note that \s is already a character class so you can use \s+ to match multiple spaces. No need to make it [\s]+
Next, if you look at these two patterns
match => [ "message", "%{DATA:jcaption_id}\s+%{TIME:orario}[\s]+%{LOGLEVEL:log_level}[\s]+\[%{USERNAME:class}\][\s]+\[?%{USERNAME:correlation_id}][\s]+\[%{WORD:}][\s]+\[%{WORD:}][\s]+%{GREEDYDATA:sqlcount}" ]
match => [ "message", "%{DATA:jcaption_id}\s+%{TIME:orario}[\s]+%{LOGLEVEL:log_level}[\s]+\[%{USERNAME:class}\][\s]+\[?%{USERNAME:correlation_id}][\s]+\[%{WORD:}][\s]+\[%{WORD:}][\s]+%{GREEDYDATA:sqlcount_param}" ]
they are identical. If the first one matches then grok will never try to second, and if the first does not match the second never will. I think the same is true of the two sqlfetch patterns, they will never match because if they were going to match then the sqlcount pattern would already have matched.
I suggest you take of the first (common) part of the log lines with one grok, then match rest of the line against a couple of patterns. Then decide what to do based on what fields get created.
Note that my patterns are anchored to start of line using ^. This makes things faster.
Note also the use of ([%{WORD}]\s+)? to consume the [RICERCA]
that sometimes comes before the correlation id.
grok { match => { "message" => "^%{DATA:jcaption_id}\s+%{TIME:orario}\s+%{LOGLEVEL:log_level}\s+\[%{USERNAME:class}\]%{GREEDYDATA:[@metadata][restOfLine]}" } }
grok {
match => {
"[@metadata][restOfLine]" => [
"^(\[%{WORD}\]\s+)?\[%{BASE10NUM:correlation_id}]\s+\[%{WORD:operation}\]\s+\[%{WORD:what}\]\s+%{GREEDYDATA:restOfLine}",
"^(\[%{WORD}\]\s+)?\[%{BASE10NUM:correlation_id}]\s+\[%{WORD:operation}\]\s+%{GREEDYDATA:sqltime}"
]
}
}
if [what] == "PARAM" {
grok { match => { "restOfLine" => " %{WORD:key}$" } }
}
if [sqltime] {
grok { match => { "sqltime" => "%{NUMBER:sqltime:float} seconds$" } overwrite => [ "sqltime" ] }
}
aggregate {
task_id => "%{correlation_id}"
code => '
map["execution_time"] ||= 0
executionTime = event.get("sqltime")
if executionTime
map["execution_time"] += executionTime
end
operation = event.get("operation")
what = event.get("what")
sql = event.get("restOfLine")
key = event.get("key")
if operation == "FETCH" and what == "SQL"
map["sqlfetch"] = sql
end
if operation == "COUNT" and what == "SQL"
map["sqlcount"] = sql
end
if key
map["key"] = key
end
'
push_map_as_event_on_timeout => true
timeout_task_id_field => "correlation_id"
timeout => 5
}
You will need to expand this to add the other fields you want.