Aggregate filter plugin events not adding to elasticsearch index

Hi,

In stdout i'm able to see the Aggregate events got printed, but not able to see in elasticsearch index.

logstash conf:

filter {
    if [user.id] != "" and [user.id] != 'null' and [user.id] != 'error' {
    aggregate {
    task_id => "%{user.id}_%{+d}_%{+MMM}"
    code => "
    if(event.get('user.id') != 'error')
    map['sdfilesize'] ||= 0 ;
    fileSize = event.get('file.size')
    if fileSize
      map['sdfilesize'] += event.get('file.size');
    end
    map['user.id'] ||= 0 ;
    map['user.id'] = event.get('user.id');
    map['incident_creation_date'] ||= 0 ; map['incident_creation_date'] = event.get('incident_creation_date');
    map['user.business_unit'] ||= 0 ; map['user.business_unit'] = event.get('user.business_unit');
    map['user.full_name'] ||= 0 ; map['user.full_name'] = event.get('user.full_name');
    map['recipient_identifier'] ||= [] ; map['recipient_identifier'] << event.get('recipient_identifier');
    map['file.extension'] ||= [] ; map['file.extension'] << event.get('file.extension');
    map['user.department'] ||= 0 ; map['user.department'] = event.get('user.department');
    end"
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "sdf"
    timeout => 300
    inactivity_timeout => 120
    timeout_tags => ['_aggregatetimeout']
      }
     }
 }

output {
      stdout { codec => rubydebug }
 
    elasticsearch {
      ssl => true
      ssl_certificate_verification => false
      hosts => "<url>"
      user => "logstash"
      password => "logstash"
      index => "testidx-%{+YYYY.MM}"
      document_id => "%{id}"
      routing => "%{user.id}"
    }

  }

Where are you setting the id field? If you do not you will likely end up with a single document being repeatedly updated.

Setting it inside filter conf.
Sorry, forgot the conf to add.

    fingerprint {
      concatenate_sources => true
      key => "logs"
      source => ["user.first_name", "user.last_name", "user.email", "log_id", "file.name"]
      target => "id"
      method => "SHA256"
    }

I do not see how events created by the aggregate filter would have these fields. How can the id field be calculated for these events? Is it not always going to end up being the same value? If you look at the statistics for your index, do you see a lot of updates/deletes?

For the task id i'm taking user.id not id.

No, it wont be same, because in the finger print it also has log_id, which is unique.
And i also tried removing document_id from output conf, still aggregate events not getting added.

I don't see any deletions, attached stats please let me know if you find anything .

Blockquote
"primaries": { "docs": { "count": 32, "deleted": 0 }, "store": { "size_in_bytes": 110226 }, "indexing": { "index_total": 32, "index_time_in_millis": 27, "index_current": 0, "index_failed": 0, "delete_total": 0, "delete_time_in_millis": 0, "delete_current": 0, "noop_update_total": 0, "is_throttled": false, "throttle_time_in_millis": 0 }, "get": { "total": 0, "time_in_millis": 0, "exists_total": 0, "exists_time_in_millis": 0, "missing_total": 0, "missing_time_in_millis": 0, "current": 0 }, "search": { "open_contexts": 0, "query_total": 9, "query_time_in_millis": 4, "query_current": 0, "fetch_total": 2, "fetch_time_in_millis": 22, "fetch_current": 0, "scroll_total": 0, "scroll_time_in_millis": 0, "scroll_current": 0, "suggest_total": 0, "suggest_time_in_millis": 0, "suggest_current": 0 }, "merges": { "current": 0, "current_docs": 0, "current_size_in_bytes": 0, "total": 0, "total_time_in_millis": 0, "total_docs": 0, "total_size_in_bytes": 0, "total_stopped_time_in_millis": 0, "total_throttled_time_in_millis": 0, "total_auto_throttle_in_bytes": 41943040 }, "refresh": { "total": 17, "total_time_in_millis": 59, "listeners": 0 }, "flush": { "total": 3, "periodic": 0, "total_time_in_millis": 49 }, "warmer": { "current": 0, "total": 9, "total_time_in_millis": 0 }, "query_cache": { "memory_size_in_bytes": 0, "total_count": 0, "hit_count": 0, "miss_count": 0, "cache_size": 0, "cache_count": 0, "evictions": 0 }, "fielddata": { "memory_size_in_bytes": 0, "evictions": 0 }, "completion": { "size_in_bytes": 0 }, "segments": { "count": 2, "memory_in_bytes": 54638, "terms_memory_in_bytes": 45578, "stored_fields_memory_in_bytes": 624, "term_vectors_memory_in_bytes": 0, "norms_memory_in_bytes": 7168, "points_memory_in_bytes": 44, "doc_values_memory_in_bytes": 1224, "index_writer_memory_in_bytes": 0, "version_map_memory_in_bytes": 0, "fixed_bit_set_memory_in_bytes": 0, "max_unsafe_auto_id_timestamp": -1, "file_sizes": {} }, "translog": { "operations": 32, "size_in_bytes": 67036, "uncommitted_operations": 0, "uncommitted_size_in_bytes": 110, "earliest_last_modified_age": 0 }, "request_cache": { "memory_size_in_bytes": 770, "evictions": 0, "hit_count": 3, "miss_count": 2 }

Hope this helps, please let me know.

@Christian_Dahlqvist
Hi Chris,

If you can go through my previous replies, that would be great!
Please do let me know, if you need any more info to debug.

@Badger Can you please help me with this.?

As Christian pointed out, none of the fields you are using to generate the fingerprint are part of the event.

The aggregate filter is generating the event when there is a timeout. The event will have the fields that you added to the map. That is,

user.id, incident_creation_date, user.business_unit, user.full_name,
recipient_identifier, file.extension, user.department, sdf

It will not have any of the fields you are using for the fingerprint.

source => ["user.first_name", "user.last_name", "user.email", "log_id", "file.name"]

An easy way to test whether it is the fingerprint filter causing problems is to simply comment it out.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.