[Resolved]Logstash - Issue of SUM by aggregate filter conditionally and can't get the expected csv output, how to correct?

Hi Team,

I have below CSV data which are exported from ES,

when,module,fa,ui,at,ci,cn,link
"April 23rd 2019, 09:13:31.562",SCM,MR,cgrant1,SCM.MR.LOAD_FILTERS,TALSCMK,TALSCMK,1
"April 23rd 2019, 22:38:31.868",SCM,TS,101031,SCM.TS.LIST_SAVED_SEARCH,CQAAbby,CQAAbby,1
"April 23rd 2019, 22:38:33.700",SCM,TS,101031,SCM.TS.START_OVER,CQAAbby,CQAAbby,1
"April 23rd 2019, 22:38:42.015",SCM,TS,101031,SCM.TS.SEARCH,CQAAbby,CQAAbby,1
"April 24th 2019, 01:02:42.097",SCM,TS,100253,SCM.TS.COMPARE,CQAAbby,CQAAbby,1
"April 24th 2019, 01:03:01.850",SCM,TS,100253,SCM.TS.SEARCH,CQAAbby,CQAAbby,1
"April 24th 2019, 01:03:09.915",SCM,TS,100253,SCM.TS.COMPARE,CQAAbby,CQAAbby,1
"April 24th 2019, 01:03:34.538",SCM,TS,100253,SCM.TS.SEARCH,CQAAbby,CQAAbby,1
...
...

and I want get a sum based on different at values, the expected output should be

at,sum
SCM.MR.LOAD_FILTERS,10
SCM.TS.LIST_SAVED_SEARCH,100
SCM.TS.START_OVER,2
SCM.TS.SEARCH,120
...
...

This is the configuration file of logstash, but the there's nothing in file "funneldata.csv" created by logstash.output.csv

filter { csv { columns => [ "when",
                         "module",
                         "fa",
                         "ui",
                         "at",
                         "ci",
                         "cn",
                         "link"]
           separator => ","
           skip_header => "true"}

      aggregate {
           task_id => "%{at}"
           code => "map['action'] ||= ''
                    map['action'] = event.get('at')
                    map['sum'] ||= 0
                    map['sum'] += 1"
           push_map_as_event_on_timeout => true
           timeout_task_id_field => "at"
           inactivity_timeout => 30
           timeout_code => "event.set('action', event.get('at'))
                            event.set('sum', event.get('sum'))"}}
    
output { csv { fields => [ "action", "sum" ]
               path => "C:/elkstack/elasticsearch-6.5.1/logs/funneldata.csv" 
               csv_options => { "headers" => true}} 
	       stdout { codec => rubydebug }}

how to correct the filter for getting the expected output file??

You filters may be working as is, although I would rewrite as

    csv {
        columns => [ "when", "module", "fa", "ui", "at", "ci", "cn", "link"]
        separator => ","
        skip_header => "true"
    }
    aggregate {
       task_id => "%{at}"
       code => "map['action'] ||= ''
                map['action'] = event.get('at')
                map['sum'] ||= 0
                map['sum'] += 1
                event.cancel"
       push_map_as_event_on_timeout => true
       timeout_task_id_field => "at"
       timeout => 10
    }

Note that I added event.cancel to the code in the aggregate filter, otherwise every line of input results in a line containing

,

being appended to the output csv. Also, your timeout_code is a no-op. When the inactivity_timeout triggers, the following is written to the csv

SCM.MR.LOAD_FILTERS,1
SCM.TS.LIST_SAVED_SEARCH,1
SCM.TS.START_OVER,1
SCM.TS.SEARCH,3
SCM.TS.COMPARE,2
1 Like

Sorry..What is “no-op”? Should I remove inactivity_timeout and timeout_code?

I read the documentation for aggregate filter,and thought that the logs in my cvs had no end event, and it is difficult to figure out the timeout seconds as well when there are thousands of logs in cvs. So I choose to use inactivity_timeout field to let logstash know that if there is no new event detected after 30s, then the corresponding task can be stopped. Because the input is a file exported from ES and the number of logs is fixed, logstash wait 30s and start to execute the timeout code to get the at and sum for each task and write them to the output file.

Please help correct me if the understanding is not completely right.

A no-op has no effect. You should remove timeout_code.

In the map for each task_id you have action and sum. When the timeout occurs it will push the map as an event. The event will contain action and sum from the map, plus an at field set equal to the task_id. The timeout_code is executing against the newly created event from the map, which already has at, action, and sum set.

I don't think it makes a difference whether you use timeout or inactivity_timeout to trigger pushing the event.

1 Like

Thanks for explanation which let me have deeper understanding for the usage of aggregate filter.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.