Two different aggregate blocks in one logstash filter

Need to aggregate two different sets of Cisco ASA logs. One - for a cert verification, other - for a firewall.

    ### TOKEN PRESENSE CHECK (717022 - cert validated THAN 722051 - IP assigned, # To match task_id user.name was added to 717022 during parsing from certificate and = %{cn}
                   # If IP assigned and cn not exists is_incident => true
    if [ASA_NMBR] in ["717022", "722051"]
    {
           aggregate
           {
              task_id => "%{user.name}"
              code => "
                   map['Count'] ||= 0
                   map['Count'] += 1
                   map['cn']               || map['cn']               = event.get('cn')
                   map['user.name']        || map['user.name']        = event.get('user.name')
                   map['source.ip']        || map['source.ip']        = event.get('source.ip')
                   map['@timestamp']       || map['@timestamp']       = event.get('@timestamp')
                   map['@log_type']        || map['@log_type']        = event.get('@log_type')
                   map['source.address']   || map['source.address']   = event.get('source.address')
                   map['@metadata.target_index'] || map['@metadata.target_index'] = event.get('@metadata.target_index')
                   map['msg']              ||= ''

                   if event.get('ASA_NMBR') == '722051' and map['cn'] != map['user.name']
                           map['is_incident'] = true
                   end

                   if ! event.get('msg').nil?
                       map['msg'] = map['msg'] + ' => ' + event.get('message')
                   end
              "
              push_map_as_event_on_timeout => true
              timeout => 1500
              inactivity_timeout => 1000

              timeout_code =>
              "
                 event.tag('aggregated')
                 event.set('asa-vpn_token-checked', true )
                 event.set('[@metadata][target_index]', 'p-cisco-asa')
              "
           }
    }
    else
    {
### GENERIC AGGREGATION
    aggregate {
                task_id => "%{@log_source} %{ASA_NMBR} %{event.action} %{protocol} %{interface} %{source.ip} %{source.port} %{destination.ip} %{destination.port} %{user.name}"
                code => "
                       map['hits'] = event.get('hit_count')
                       map['Count'] ||= 0
                       h = event.get('hit_count')
                       if h.nil? || h == 0
                           map['Count'] += 1
                       else
                           map['Count'] += h.to_i
                       end

                       map['Count'] += 1
                       map['protocol']         || map['protocol']         = event.get('protocol')
                       map['interface']        || map['interface']        = event.get('interface')
                       map['source.port']      || map['source.port']      = event.get('source.port')
                       map['source.address']   || map['source.address']   = event.get('source.address')
                       map['@metadata.target_index'] || map['@metadata.target_index'] = event.get('@metadata.target_index')
                       map['destination.port'] || map['destination.port'] = event.get('destination.port')
                       map['@log_source']      || map['@log_source']      = event.get('@log_source')
                       map['@timestamp']       || map['@timestamp']       = event.get('@timestamp')
                       map['@log_type']        || map['@log_type']        = event.get('@log_type')
                       map['@log_host']        || map['@log_host']        = event.get('@log_host')
                       map['user.name']        || map['user.name']        = event.get('user.name')
                       map['msg_plus']         || map['msg_plus']         = event.get('message')
                       map['ASA_NMBR']         || map['ASA_NMBR']         = event.get('ASA_NMBR')
                       map['event.action']     || map['event.action']     = event.get('event.action')

                       if  event.get('source.ip') != nil
                          if  event.get('source.ip').match(/^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/)
                              map['source.ip']        = event.get('source.ip')
                          else
                              map['source.ip_name'] = event.get('source.ip')
                          end
                       end
                       if  event.get('destination.ip') != nil
                          if  event.get('destination.ip').match(/^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/)
                              map['destination.ip']        = event.get('destination.ip')
                          else
                              map['destination.ip_name'] = event.get('destination.ip')
                          end
                      end
                "
                push_map_as_event_on_timeout => true
                timeout => 10

                inactivity_timeout => 5

                   timeout_code =>
                   "
                     event.tag('aggregated')
                     event.set('[@metadata][target_index]', 'p-cisco-asa')
                   "
        }
     }

When restarting logstash sometimes the error appears:

For task_id pattern %{user.name}, inactivity_timeout must be lower than timeout>

Is it possible to use more then one aggregate blocks?
If not how resolve the task?

The main idea to split different messages to one. For example a first message has user.name and cert name, the second - an user.name and an assigned IP. Messages can arrive with time lag.

Thanks

Yes, it is.

Often, because of the error, logstash don't start the pipeline.
I have to add or remove extra empty lines several times to make it start.
The error was the same every time.

OK, so are definitely some bugs here. If I change the inactivity_timeout in the aggregate with a task_id of "%{@log_source} %{ASA_NMBR} ..." from 5 to 11, sometimes I get the correct error message

Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<LogStash::ConfigurationError: Aggregate plugin: For task_id pattern %{@log_source} %{ASA_NMBR} %{event.action} %{protocol} %{interface} %{source.ip} %{source.port} %{destination.ip} %{destination.port} %{user.name}, inactivity_timeout must be lower than timeout

However, if I then change it back to 5, sometimes the pipeline restarts cleanly, sometimes I get

Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<LogStash::ConfigurationError: Aggregate plugin: For task_id pattern %{user.name}, inactivity_timeout must be lower than timeout>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-aggregate-2.9.1/lib/logstash/filters/aggregate.rb:117:in `block in register'",

(i.e. it starts complaining about the filter in which the timeout values were never invalid)

and sometimes I get

Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<LogStash::ConfigurationError: Aggregate plugin: For task_id pattern '%{user.name}', there are more than one filter which defines timeout options. All timeout options have to be defined in only one aggregate filter per task_id pattern.

Even though there is only aggregate filter with that task_id. So it appears to not be deleting the configuration of the previous pipeline instance when it restarts the pipeline.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.