Hi there,
Thanks in advance for your time
I'm trying to use filter { aggregate { } }
while reading/parsing an old log file
Into this log file, every second, there is a block of statistics generated that I'd like to aggregate together. For instance, something like =>
....
2020-07-27 15:46:11.151282698 line 1
2020-07-27 15:46:11.151282207 line 2
2020-07-27 15:46:11.151283514 line 3
2020-07-27 15:46:11.155536588 {keyword} stats of AAA I(1, 2) .. IP(3, 4) ..
2020-07-27 15:46:11.155567522 {keyword} stats of BBB I(5, 6) .. IP(7, 8) ..
2020-07-27 15:46:11.155578544 {keyword} stats of CCC I(9, 10) .. IP(11, 12) ..
2020-07-27 15:46:11.195195669 line 4
.....
2020-07-27 15:46:12.155536588 {keyword} stats of AAA I(45, 46) .. IP(47, 48) ..
2020-07-27 15:46:12.155567522 {keyword} stats of BBB I(49, 50) .. IP(51, 52) ..
.....
(... values are fake and random ....)
I'm first filtering the lines where keyword exists : if [message] =~ / \{keyword\} / {
Then, I extract from those lines creationtime
, I_sec
, I_tot
, IP_sec
and IP_tot
like :
creationtime .. {action} .. I(I_tot, I_sec) .. IP(IP_tot, IP_sec) ..
...with ['action'] being my keyword
Everything's fine so far, all my documents are successfully inserted into Elasticsearch
But I'm missing the aggregated fields : I_sec_sum
, I_tot_sum
, IP_sec_sum
and IP_tot_sum
which are supposed to be the sum of each attribute per second. E.g in my above example, for the second 11
, I_sec_sum
would be 2+6+10
and IP_tot_sum
= 3+7+11
My filter looks like =>
filter {
if [type] == "mytype" {
if [message] =~ / \{keyword\} / {
grok {
pattern_definitions => {
"TIME1" => "%{YEAR}-%{MONTHNUM2}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}"
}
match => { "message" => "%{TIME1:creationtime} %{GREEDYDATA:to-delete-1} \{%{WORD:action}\} %{GREEDYDATA:to-delete-2} I\(%{INT:I_tot:int}, %{INT:I_sec:int}\) %{GREEDYDATA:to-delete-3} IP\(%{INT:IP_tot:int}, %{INT:IP_sec:int}\) %{GREEDYDATA:to-delete-4}" }
}
date {
locale => "en"
match => ["creationtime", "YYYY-MM-dd HH:mm:ss.SSSSSSSSS"]
target => "creationtime"
}
aggregate {
timeout_timestamp_field => "creationtime"
task_id => "%{action}"
code => "map['I_sec_sum'] ||= 0; map['I_sec_sum'] += event.get('I_sec');"
push_map_as_event_on_timeout => true
timeout_task_id_field => "action"
inactivity_timeout => 0.3
}
mutate { remove_field => [ "host", "sequence", "path", "to-delete-1", "to-delete-2", "to-delete-3", "to-delete-4", "message", "qs_id", "action" ] }
} else {
drop {}
}
}
}
I've started to test only with 'I_sec_sum
for now but of course the code
is not yet done ...
I actually have no error from logstash logs, and the correct number of line having keyword
are inserted in Elasticsearch ... but not the events that I expected to be triggered after my timeout of 0.3
sec ...
Any idea what would be wrong here ? Looks like the aggregate filter does not trigger any document ...
From my understanding, I'd expect the timer
to be reset to 0 every time a line with task_id
is detected and to trigger the document below when timer reaches inactivity_timeout : { "action": "keyword", "I_sec_sum": 18 }
Thanks !!
Guillaume