Using aggregate filter to count number of events and sum field's values

Hi all,
I have an index in elasticsearch name as "myindex" so that have fields like"date_eng","company_name","service_name", "amount". I want to use aggregate filter so that counts the times which an services has been used by a company in yesterday and also sum the value of amount for each services of company; according this i used company_name, service_name and date as aggregate task_id. for example, data is as following:

{"date_eng":2020-08-09, "company_name": "cm1","service_name":"ser1","amount":100} 
{"date_eng":2020-08-09, "company_name": "cm1","service_name":"ser1","amount":0} 
{"date_eng":2020-08-09, "company_name": "cm1","service_name":"ser1","amount":10} 
{"date_eng":2020-08-09, "company_name": "cm1","service_name":"ser2","amount":5} 
{"date_eng":2020-08-09, "company_name": "cm1","service_name":"ser2","amount":10} 
{"date_eng":2020-08-09, "company_name": "cm2","service_name":"ser1","amount":0} 
{"date_eng":2020-08-09, "company_name": "cm2","service_name":"ser2","amount":50} 

output should be as following:

 cm1-ser1-2020-08-09-3-110
 cm1-ser2-2020-08-09-2-15
 cm2-ser1-2020-08-09-1-0
 cm2-ser2-2020-08-09-1-50

for this purpose, i used following script in logstash:

input {
elasticsearch {
hosts => ["http://10.0.1.1:9200/"]
index => "myindex*"

query => '{
		"query": { 
		      "bool" : {
				   "filter" : { 
                        "range" : {   "mytimestamp" : { "gte": "now-1d/d", "lte": "now-1d/d"}} 
				   }
		      }

		}, 
		"sort": [ "_doc" ] 
	}'

	
	
}
}
filter {

mutate {
            split => ["date_eng", "-"]
			add_field => { "year" => "%{date_eng[0]}" }
			add_field => { "mounth" => "%{date_eng[1]}" }
			add_field => { "day" => "%{date_eng[2]}" }
}

mutate {
 add_field => {"aggregate_id" => "%{year}_%{mounth}_%{day}_%{company_name}_%{service_name}"}
}
aggregate {
    task_id => "%{aggregate_id}"
    code => "
         map['value'] ||= 0
		          map['count'] ||= 0


		 map['value'] += event.get('amount')
		 		 map['count'] +=1


            event.set('company_value', map['value']) 
			            event.set('company_count', map['count']) 



		 "

		 }

mutate {
	add_field => {
		"My_Data" => "%{company_name}-%{service_name}-%{year}-%{mounth}-%{day}-%{company_count}-%{company_value}"
	}
}


}
output {
csv {
fields => ["My_Data"]
path => "D:\my data\CS_%{year}_%{mounth}_%{day}.txt"
}


  stdout { codec => rubydebug }

}

but when i run the script the result is not expected and is as following:

  cm1-ser1-2020-08-09-2-100
  cm1-ser1-2020-08-09-3-110
  cm1-ser2-2020-08-09-1-10
  cm1-ser2-2020-08-09-2-15
  cm2-ser1-2020-08-09-1-0
  cm2-ser2-2020-08-09-1-50

Any advise will be so appreciated. Many thanks

I would suggest doing the aggregation in elasticsearch.

many thanks for your reply. actually my ELK license is basic and i want to schedule the reporting procedure which cannot do it in the case of basic license. I will be so appreciated if you can advise me to handle this case using logstash. Many thanks.

It is not a licence issue. You are running an elasticsearch query to fetch data that you want to aggregate. I am suggesting you try to do the aggregation as part of the query.

Do you mean i use aggregation in elasticsearch input query which used in logstash?

Yes.

many thanks for your reply. actually one of my field's type is text and i cannot use sum aggregation on it. I am changing field type in logstash side (change from string to integer). can you please advise me how can i handle the issue mentioned in the first comment, actually for each input events with same task_id, aggregate filter has an output event whereas it is expected to release all events with same task_id as an output event.

OK, assuming your data is sorted you can do something similar to example 4. Use event.cancel to delete the individual data rows, and push_previous_map_as_event to create an event once all of the data for a given id has been seen. Note that only the data in the map is pushed as part of the event, so make sure you add everything you want in the final event to the map.

You must disable java_execution for this to work.

it is so so weird. when i add "event.cancel()" and "push_previous_map_as_event => true" to aggregate filter, logstash output is as following which means no one of fields has value!!!!!!!!!

%{company_name}-%{service_name}-%{year}-%{mounth}-%{day}-1
%{company_name}-%{service_name}-%{year}-%{mounth}-%{day}-2
...

and when i delete these two commands the output is same as previous

If you do event.cancel then the original data rows are deleted. The aggregate generates new events as it processes each group of rows. When those events get to

mutate {
add_field => {
	"My_Data" => "%{company_name}-%{service_name}-%{year}-%{mounth}-%{day}-%{company_count}-%{company_value}"
}
}

None of those fields will exist unless you added them to the map when processing the original rows.

map['value'] ||= 0
map['count'] ||= 0

The events will have fields called [value] and [count] since those exist in the map.

Many thanks for your reply. I add fields to map as following:

 map['Company_name'] ||= event.get('Company_name')
 map['Service_name'] ||= event.get('Service_name')
 map['year'] ||= event.get('year')
 map['mounth'] ||= event.get('mounth')
 map['day'] ||= event.get('day')

the output changes and has less events but still is not correct

when i used "year" or "month" or "day" as task_id it work correct and count number of all events (it is noted that value of these three field is same in all events). but when i used "Company_name" or "service_name" , the output is not expected.

Field names are case sensitive. Company_name and company_name are different fields. Similarly for service_name.

No it is my mistake in writing and are correct in script

OK, so what does the filter section look like now?

my aggregate filter is as following:

mutate {
 add_field => {"aggregate_id" => "%{year}_%{mounth}_%{day}_%{Company_name}_%{Service_name}"}
}
aggregate {
    task_id => "%{Company_name}"
    code => "
		         map['company_count'] ||= 0
		 		 map['company_count'] +=1
                 map['Service_name'] ||=  event.get('Service_name') 
                 map['Company_name'] ||=  event.get('Company_name') 
               map['year'] ||= event.get('year')
              map['mounth'] ||= event.get('mounth')
              map['day'] ||= event.get('day')


         event.cancel()



		 "
       push_previous_map_as_event => true

         timeout => 3600
		 }
mutate {
	add_field => {
		"My_Data" => "%{Company_name}-%{Service_name}-%{year}-%{mounth}-%{day}-%{company_count}"
	}

which My_Data is output field writing on a .txt file.

I have 68 events, which my correct output should be as following:

cm1-ser1-2020-08-09-52
cm1-ser2-2020-08-09-4
cm2-ser1-2020-08-09-12

when I used "year" field as task_id, output is as following which used last event comany_name and service_name:

cm1-ser1-2020-08-09-68

when I used "aggregate_id" as task_id, output is as following:

cm1-ser1-2020-08-09-6
cm2-ser1-2020-08-09-4
cm1-ser1-2020-08-09-36
cm2-ser1-2020-08-09-2
cm1-ser1-2020-08-09-2
cm2-ser1-2020-08-09-2
cm1-ser2-2020-08-09-2
cm2-ser1-2020-08-09-2
cm1-ser2-2020-08-09-2
cm2-ser1-2020-08-09-2
cm1-ser1-2020-08-09-8

using "Company_name" and "Service_name" as "task_id" leads to same result like "aggregate_id"

Your data is not sorted, so you need to something like example 3, and use push_map_as_event_on_timeout instead of push_previous_map_as_event.

my input data is not sorted

I change aggregate filter as following:

aggregate {
    task_id => "%{aggregate_id}"
    code => "
		         map['company_count'] ||= 0
		 		 map['company_count'] +=1
			     event.set('company_count', map['company_count']) 
		 "
       push_map_as_event_on_timeout => true

         timeout => 3600
		 }

and back to first problem :slightly_smiling_face:

My output is as following:

cm1-ser1-2020-08-09-1
cm1-ser1-2020-08-09-2
cm1-ser1-2020-08-09-3
cm1-ser1-2020-08-09-4
cm1-ser1-2020-08-09-5
cm1-ser1-2020-08-09-6
cm2-ser1-2020-08-09-1
cm2-ser1-2020-08-09-2
cm2-ser1-2020-08-09-3
cm2-ser1-2020-08-09-4
cm1-ser1-2020-08-09-7
cm1-ser1-2020-08-09-8
cm1-ser1-2020-08-09-9
cm1-ser1-2020-08-09-10
cm1-ser1-2020-08-09-11
cm1-ser1-2020-08-09-12
cm1-ser1-2020-08-09-13
cm1-ser1-2020-08-09-14
cm1-ser1-2020-08-09-15
cm1-ser1-2020-08-09-16
cm1-ser1-2020-08-09-17
cm1-ser1-2020-08-09-18
cm1-ser1-2020-08-09-19
cm1-ser1-2020-08-09-20
cm1-ser1-2020-08-09-21
cm1-ser1-2020-08-09-22
cm1-ser1-2020-08-09-23
cm1-ser1-2020-08-09-24
cm1-ser1-2020-08-09-25
cm1-ser1-2020-08-09-26
cm1-ser1-2020-08-09-27
cm1-ser1-2020-08-09-28
cm1-ser1-2020-08-09-29
cm1-ser1-2020-08-09-30
cm1-ser1-2020-08-09-31
cm1-ser1-2020-08-09-32
cm1-ser1-2020-08-09-33
cm1-ser1-2020-08-09-34
cm1-ser1-2020-08-09-35
cm1-ser1-2020-08-09-36
cm1-ser1-2020-08-09-37
cm1-ser1-2020-08-09-38
cm1-ser1-2020-08-09-39
cm1-ser1-2020-08-09-40
cm1-ser1-2020-08-09-41
cm1-ser1-2020-08-09-42
cm2-ser1-2020-08-09-5
cm2-ser1-2020-08-09-6
cm1-ser1-2020-08-09-43
cm1-ser1-2020-08-09-44
cm2-ser1-2020-08-09-7
cm2-ser1-2020-08-09-8
cm1-ser2-2020-08-09-1
cm1-ser2-2020-08-09-2
cm2-ser1-2020-08-09-9
cm2-ser1-2020-08-09-10
cm1-ser2-2020-08-09-3
cm1-ser2-2020-08-09-4
cm2-ser1-2020-08-09-11
cm2-ser1-2020-08-09-12
cm1-ser1-2020-08-09-45
cm1-ser1-2020-08-09-46
cm1-ser1-2020-08-09-47
cm1-ser1-2020-08-09-48
cm1-ser1-2020-08-09-49
cm1-ser1-2020-08-09-50
cm1-ser1-2020-08-09-51
cm1-ser1-2020-08-09-52