Logstash aggregate filter - usage help needed

Hi,

I am using the aggregate filter to read repeating blocks of log data. I need block-wise data which at this moment I am not able to get.

Log file:

Nov 30,15:02:42.081 INFO CD playback started
Nov 30,15:02:42.082 INFO disk selected = disk1
Nov 30,15:02:42.084 INFO No. of songs available:12
Nov 30,15:02:42.085 INFO Total playback time = 62 minutes
Nov 30,15:02:42.086 INFO Song selected: song1
Nov 30,15:02:42.087 INFO CD playback ended
Nov 30,15:02:44.082 INFO CD playback started
Nov 30,15:02:44.083 INFO disk selected = disk2
Nov 30,15:02:44.084 INFO No. of songs available:11
Nov 30,15:02:44.086 INFO Total playback time = 59 minutes
Nov 30,15:02:44.087 INFO Song selected: song1
Nov 30,15:02:44.088 INFO CD playback ended
Nov 30,15:02:48.080 INFO CD playback started
Nov 30,15:02:48.082 INFO disk selected = disk1
Nov 30,15:02:48.084 INFO No. of songs available:12
Nov 30,15:02:48.086 INFO Total playback time = 62 minutes
Nov 30,15:02:48.088 INFO Song selected: song4
Nov 30,15:02:48.089 INFO CD playback ended

Logstash conf file:

input {
	stdin {
        type => "stdin"
    } 
}
filter {
	grok {
# 		add_tag => [ "valid" ]
		match => { "message" => "%{MONTH:[@metadata][month]} %{MONTHDAY:[@metadata][day]},%{TIME:[@metadata][time]}%{SPACE}%{LOGLEVEL:log_level}%{SPACE}%{GREEDYDATA:log_message}" }
		add_field => {"log_timestamp" => "%{[@metadata][month]} %{[@metadata][day]},%{[@metadata][time]}"}
   } 
   
  
### CD-playback start
    if [log_message] =~ "CD playback started" {
		grok {
			match => { "log_message" => ["%{GREEDYDATA}"] }
			add_tag => [ "valid" ]
		}
		aggregate {
			task_id => "%{diskSelected}"
			code => ""
			map_action => "create"
		}
    }

### Get the selected disk name
    if [log_message] =~ "disk selected" {
		grok {
			match => { "log_message" => ["disk selected = %{WORD:diskSelected}"] }
			add_tag => [ "valid" ]
		}
		aggregate {
			task_id => "%{diskSelected}"
			code => "map['d'] ||= event.get('diskSelected')"
			map_action => "update"
		}
		memorize {
			fields => ["diskSelected"]
			default => { "diskSelected" => "NOTFOUND" }
		}
    }

### Get the no. of songs available in the selected disk
	if [log_message] =~ "No. of songs available" {
		grok {
			add_tag => [ "valid" ]
			match => { "log_message" => ["No. of songs available:%{NUMBER:NoOfSongsAvailable}"] }
		}
		aggregate {
			task_id => "%{diskSelected}"
#			code => "map['d'] ||= event.get('NoOfSongsAvailable')"
			code => "map['d'] ||= {}; map['d'][event.get('NoOfSongsAvailable')] ||= {}; map['d'][event.get('NoOfSongsAvailable')] = ('%{diskSelected}')"
			map_action => "update"
		}
	}
	
### Get the total playback time of the selected disk
#	if [log_message] =~ "Total playback time" {
#		grok {
#			add_tag => [ "valid" ]
#			match => { "log_message" => ["Total playback time = %{NUMBER:PlaybackTime} minutes"] }
#		}
#		aggregate {
#			task_id => "%{diskSelected}"
#			code => "map['d'] ||= event.get('PlaybackTime')"
#			map_action => "update"
#		}
#	}

### Get the selected song of the selected disk
	if [log_message] =~ "Song selected" {
		grok {
			add_tag => [ "valid" ]
			match => { "log_message" => ["Song selected: %{WORD:SongSelected}"] }
		}
		aggregate {
			task_id => "%{diskSelected}"
			code => "map['d'] ||= event.get('SongSelected')"
			map_action => "update"
		}
	}

### CD-playback end
    if [log_message] =~ "CD playback ended" {
		grok {
			add_tag => [ "valid" ]
			match => { "log_message" => ["%{GREEDYDATA}"] }
		}	
		aggregate {
			task_id => "%{diskSelected}"
			code => "event.set('d', map['d'])"
 			timeout => 120
#			map_action => "update"	
			end_of_task => true
		}
    }

	if "_grokparsefailure" in [tags] {            
		drop { }
	}
   
	date {
		match => [ "log_timestamp", "MMM dd,HH:mm:ss.SSS"]
		timezone => 'Asia/Jerusalem'
		target => "log_timestamp"
	}
   
	mutate {
		remove_field => ["message"]
#		remove_field => ["log_timestamp"]
		convert => { 
			"NoOfSongsAvailable" => "float"
			"PlaybackTime" => "float"
		}
	}
}

output {
stdout { codec => rubydebug }
    elasticsearch {
		hosts => "localhost"
		index => "cdlog"
	}
}

Here, I will need data like this : (Assume Kibana data table)

 Selected disk	|No. of songs	|Total playback time	|Song selected
 disk1			|12				|62					    |song1
 disk2			|11				|59				        |song1
 disk1			|12				|62				        |song4

I am not able to parse/store data in a format where I can get the above table statistics. Meaning each CD run corresponds to a [disk, song] and each disk has certain properties

Can someone help please.

Regards,
Ruthu

@fbaligand, could please let me know if my expectation/usage of aggregate-plugin itself is wrong. If not, could you kindly suggest some fix. Thank you,

Regards,
Ruthu

So, the first problem is that in "task_id" option, you should reference a field which is present in every log line.
It is a correlation id.
In your case, "%{type}" is fine.

Secondly, you should process coherently the "d" field in aggregation map.

Here's a configuration that should solve your problem :

input {
	stdin {
        type => "stdin"
    } 
}
filter {
	grok {
# 		add_tag => [ "valid" ]
		match => { "message" => "%{MONTH:[@metadata][month]} %{MONTHDAY:[@metadata][day]},%{TIME:[@metadata][time]}%{SPACE}%{LOGLEVEL:log_level}%{SPACE}%{GREEDYDATA:log_message}" }
		add_field => {"log_timestamp" => "%{[@metadata][month]} %{[@metadata][day]},%{[@metadata][time]}"}
   } 
   
  
### CD-playback start
    if [log_message] =~ "CD playback started" {
		grok {
			match => { "log_message" => ["%{GREEDYDATA}"] }
			add_tag => [ "valid" ]
		}
		aggregate {
			task_id => "%{type}"
			code => "map['d'] = {}"
			map_action => "create"
		}
    }

### Get the selected disk name
    if [log_message] =~ "disk selected" {
		grok {
			match => { "log_message" => ["disk selected = %{WORD:diskSelected}"] }
			add_tag => [ "valid" ]
		}
		aggregate {
			task_id => "%{type}"
			code => "map['d']['diskSelected'] = event.get('diskSelected')"
			map_action => "update"
		}
		memorize {
			fields => ["diskSelected"]
			default => { "diskSelected" => "NOTFOUND" }
		}
    }

### Get the no. of songs available in the selected disk
	if [log_message] =~ "No. of songs available" {
		grok {
			add_tag => [ "valid" ]
			match => { "log_message" => ["No. of songs available:%{NUMBER:NoOfSongsAvailable}"] }
		}
		aggregate {
			task_id => "%{type}"
			code => "map['d']['NoOfSongsAvailable'] = event.get('NoOfSongsAvailable')"
			map_action => "update"
		}
	}
	
### Get the total playback time of the selected disk
#	if [log_message] =~ "Total playback time" {
#		grok {
#			add_tag => [ "valid" ]
#			match => { "log_message" => ["Total playback time = %{NUMBER:PlaybackTime} minutes"] }
#		}
#		aggregate {
#			task_id => "%{type}"
#			code => "map['d']['PlaybackTime'] = event.get('PlaybackTime')"
#			map_action => "update"
#		}
#	}

### Get the selected song of the selected disk
	if [log_message] =~ "Song selected" {
		grok {
			add_tag => [ "valid" ]
			match => { "log_message" => ["Song selected: %{WORD:SongSelected}"] }
		}
		aggregate {
			task_id => "%{type}"
			code => "map['d']['SongSelected'] = event.get('SongSelected')"
			map_action => "update"
		}
	}

### CD-playback end
    if [log_message] =~ "CD playback ended" {
		grok {
			add_tag => [ "valid" ]
			match => { "log_message" => ["%{GREEDYDATA}"] }
		}	
		aggregate {
			task_id => "%{type}"
			code => "event.set('d', map['d'])"
 			timeout => 120
			map_action => "update"	
			end_of_task => true
		}
    }

	if "_grokparsefailure" in [tags] {            
		drop { }
	}
   
	date {
		match => [ "log_timestamp", "MMM dd,HH:mm:ss.SSS"]
		timezone => 'Asia/Jerusalem'
		target => "log_timestamp"
	}
   
	mutate {
		remove_field => ["message"]
#		remove_field => ["log_timestamp"]
		convert => { 
			"NoOfSongsAvailable" => "float"
			"PlaybackTime" => "float"
			"[d][NoOfSongsAvailable]" => "float"
			"[d][PlaybackTime]" => "float"
		}
	}
}

output {
stdout { codec => rubydebug }
    elasticsearch {
		hosts => "localhost"
		index => "cdlog"
	}
}
1 Like

Hi @fbaligand,

First of all, thanks for your time and responding to the query.

  • I would like to know what is "%{type}" that is mentioned above, where does it come from.
  • I think I cannot generate the disk-wise-statistics as shown in the table in my original post using the aggregate filter, since the information is not present in every line, am I correct here?

If you could kindly clarify these, I can close it as resolved since rest of the stuff from your post is clear to me.

Regards,
Ruthu

The "type" field is set by your input "stdin".
So naturally, you have this field set on every Logstash event.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.