Aggregate filter not catching up: input start_position = "beginning"

One of the things I love about logstash is the "sincedb" database which tracks current position and — if interrupted — remembers the location of where it left off. So, if Logstash is down for, say 4 hours, once restarted it catches up and nothing is lost.

I discovered the aggregate filter recently, but it's not behaving this way. If I stop logstash for a while, it simply restarts from the beginning. :grimacing: This should work, shouldn't it?

I'm processing Nginx access.log records (written in JSON) via one pipeline with two outputs. One output is sending the raw records to ES whereas the 5 minute aggregate is sending writing to file. I also tried writing the aggregation to ES.

Should aggregations be in their own pipeline seperate from the raw processing & output?

{"logstash.version"=>"7.3.2"}
Here's my input configuration.

input {
    file {
      path => "/var/log/nginx/access*"
      start_position => "beginning"
      sincedb_path => "/var/lib/logstash/plugins/inputs/file/.sincedb_nginx_update_alias"
      exclude => "*.gz"
      codec => json
    }
}

Using an aggregate filter does not affect what an input reads. By default an aggregate starts from scratch when logstash restarts. You can use the aggregate_maps_path to persist the contents of the maps across restarts.

Agreed, yes, I'm using aggregate_maps_path. Here's my configuration. And when I start logstash up it always confirms that [INFO ][logstash.filters.aggregate] Aggregate maps loaded from : /usr/share/logstash/aggregate_maps.db.

input {
    file {
      path => "/var/log/nginx/access*"
      start_position => "beginning"
      sincedb_path => "/var/lib/logstash/plugins/inputs/file/.sincedb_v2"
      exclude => "*.gz"
      codec => json
    }
}


filter {
    dissect {
        mapping => { "request" => "%{} /%{streamtype}/%{account}/%{stream}/%{unparsed_request} %{protocol}/%{httpversion}" }
    }

    geoip { source => "clientip" }

    if [agent] != "-" and [agent] != "" { 
        useragent { 
            source => "agent" 
            target => "ua"
        } 
    } 

    fingerprint {
      concatenate_sources => true
      source  =>  ['clientip', 'agent', 'account', 'stream', 'cache_status', 'response']
      method  =>  "MURMUR3"
      target  =>  "eventid_5m"
    }


    # 5 minute aggregation
    aggregate {
        aggregate_maps_path => "/usr/share/logstash/aggregate_maps.db"  # must be in a folder where user logstash has access
        task_id => "%{eventid_5m}"
        code => "
            map['clientip_n_agent'] = event.get('clientip_n_agent')
            map['account']          = event.get('account')
            map['stream']           = event.get('stream')
            map['cache_status']     = event.get('cache_status')
            map['response']         = event.get('response')
            map['streamtype']       = event.get('streamtype')
            map['httpversion']      = event.get('httpversion')
            map['host']             = event.get('host')

            geoip_hash = {}
            geoip_hash.store('continent_code', event.get('[geoip][continent_code]'))
            geoip_hash.store('country_name',   event.get('[geoip][country_name]'))
            geoip_hash.store('country_code',   event.get('[geoip][country_code2]'))
            geoip_hash.store('region_name',    event.get('[geoip][region_name]'))    if event.get('[geoip][region_name]')
            geoip_hash.store('city_name',      event.get('[geoip][city_name]'))      if event.get('[geoip][city_name]')
            map['geoip'] = geoip_hash

            ua_hash = {}
            ua_hash.store('os',   event.get('[ua][os]'  ) )   if event.get('[ua][os]')
            ua_hash.store('name', event.get('[ua][name]') )   if event.get('[ua][name]')
            if event.get('[ua][os_major]')
                os_ver = [event.get('[ua][os_major]').to_s, event.get('[ua][os_minor]').to_s].join('.')
                ua_hash.store('ver', os_ver)
            end
            map['ua'] = ua_hash

            map['bytes_sum'] ||= 0; map['bytes_sum'] += event.get('bytes').to_i;

        "

        push_map_as_event_on_timeout => true
        timeout_task_id_field => "eventid_5m"
        timeout => 60
        timeout_tags => ['_5m_agg']

        timeout_code => "
            event.set('bytes', event.get('bytes_sum'))
            "
    }

}

output {
    if "_5m_agg" in [tags] {
        elasticsearch {
            hosts         => ["IP1", IP2", "IP3"]
            index         => "ms_agg_5m"
            template_overwrite => true
        }
#         file {
#             path => "/var/log/logstash/access_summary_5m.log"
#             codec => "json_lines"
#         }
    }
}

Does timeout_timestamp_field help?

Ah, adding timeout_timestamp_field took me one step further. Now when I start logstash, all the missing events (while logstash was down) are sent to ES. However, they all have a current @timestamp in ES. :unamused: I'm trying to figure that out now.

It's working now! I needed to map the @timestamp field as follows, that way Elasticsearch doesn't use the @timestamp generated by Logstash at time of event, but instead uses the one from the logfile.

 map['@timestamp']       = event.get('@timestamp')

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.