Hi,
I am using the aggregate filter to read repeating blocks of log data. I need block-wise data which at this moment I am not able to get.
Log file:
Nov 30,15:02:42.081 INFO CD playback started
Nov 30,15:02:42.082 INFO disk selected = disk1
Nov 30,15:02:42.084 INFO No. of songs available:12
Nov 30,15:02:42.085 INFO Total playback time = 62 minutes
Nov 30,15:02:42.086 INFO Song selected: song1
Nov 30,15:02:42.087 INFO CD playback ended
Nov 30,15:02:44.082 INFO CD playback started
Nov 30,15:02:44.083 INFO disk selected = disk2
Nov 30,15:02:44.084 INFO No. of songs available:11
Nov 30,15:02:44.086 INFO Total playback time = 59 minutes
Nov 30,15:02:44.087 INFO Song selected: song1
Nov 30,15:02:44.088 INFO CD playback ended
Nov 30,15:02:48.080 INFO CD playback started
Nov 30,15:02:48.082 INFO disk selected = disk1
Nov 30,15:02:48.084 INFO No. of songs available:12
Nov 30,15:02:48.086 INFO Total playback time = 62 minutes
Nov 30,15:02:48.088 INFO Song selected: song4
Nov 30,15:02:48.089 INFO CD playback ended
Logstash conf file:
input {
stdin {
type => "stdin"
}
}
filter {
grok {
# add_tag => [ "valid" ]
match => { "message" => "%{MONTH:[@metadata][month]} %{MONTHDAY:[@metadata][day]},%{TIME:[@metadata][time]}%{SPACE}%{LOGLEVEL:log_level}%{SPACE}%{GREEDYDATA:log_message}" }
add_field => {"log_timestamp" => "%{[@metadata][month]} %{[@metadata][day]},%{[@metadata][time]}"}
}
### CD-playback start
if [log_message] =~ "CD playback started" {
grok {
match => { "log_message" => ["%{GREEDYDATA}"] }
add_tag => [ "valid" ]
}
aggregate {
task_id => "%{diskSelected}"
code => ""
map_action => "create"
}
}
### Get the selected disk name
if [log_message] =~ "disk selected" {
grok {
match => { "log_message" => ["disk selected = %{WORD:diskSelected}"] }
add_tag => [ "valid" ]
}
aggregate {
task_id => "%{diskSelected}"
code => "map['d'] ||= event.get('diskSelected')"
map_action => "update"
}
memorize {
fields => ["diskSelected"]
default => { "diskSelected" => "NOTFOUND" }
}
}
### Get the no. of songs available in the selected disk
if [log_message] =~ "No. of songs available" {
grok {
add_tag => [ "valid" ]
match => { "log_message" => ["No. of songs available:%{NUMBER:NoOfSongsAvailable}"] }
}
aggregate {
task_id => "%{diskSelected}"
# code => "map['d'] ||= event.get('NoOfSongsAvailable')"
code => "map['d'] ||= {}; map['d'][event.get('NoOfSongsAvailable')] ||= {}; map['d'][event.get('NoOfSongsAvailable')] = ('%{diskSelected}')"
map_action => "update"
}
}
### Get the total playback time of the selected disk
# if [log_message] =~ "Total playback time" {
# grok {
# add_tag => [ "valid" ]
# match => { "log_message" => ["Total playback time = %{NUMBER:PlaybackTime} minutes"] }
# }
# aggregate {
# task_id => "%{diskSelected}"
# code => "map['d'] ||= event.get('PlaybackTime')"
# map_action => "update"
# }
# }
### Get the selected song of the selected disk
if [log_message] =~ "Song selected" {
grok {
add_tag => [ "valid" ]
match => { "log_message" => ["Song selected: %{WORD:SongSelected}"] }
}
aggregate {
task_id => "%{diskSelected}"
code => "map['d'] ||= event.get('SongSelected')"
map_action => "update"
}
}
### CD-playback end
if [log_message] =~ "CD playback ended" {
grok {
add_tag => [ "valid" ]
match => { "log_message" => ["%{GREEDYDATA}"] }
}
aggregate {
task_id => "%{diskSelected}"
code => "event.set('d', map['d'])"
timeout => 120
# map_action => "update"
end_of_task => true
}
}
if "_grokparsefailure" in [tags] {
drop { }
}
date {
match => [ "log_timestamp", "MMM dd,HH:mm:ss.SSS"]
timezone => 'Asia/Jerusalem'
target => "log_timestamp"
}
mutate {
remove_field => ["message"]
# remove_field => ["log_timestamp"]
convert => {
"NoOfSongsAvailable" => "float"
"PlaybackTime" => "float"
}
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => "localhost"
index => "cdlog"
}
}
Here, I will need data like this : (Assume Kibana data table)
Selected disk |No. of songs |Total playback time |Song selected
disk1 |12 |62 |song1
disk2 |11 |59 |song1
disk1 |12 |62 |song4
I am not able to parse/store data in a format where I can get the above table statistics. Meaning each CD run corresponds to a [disk, song] and each disk has certain properties
Can someone help please.
Regards,
Ruthu