I am trying to aggregate events over a DAY period but discarding any event after one hour of inactivity.
the events in ElasticSearch look like this:
{
"clicks": 6,
"unique_usage": "2018-05-01_04_10.5438/0000-00SS_54.252.254.236_Amazon Route 53 Health Check Service; ref:f948b73a-405e-4f03-bfdd-81fb209f8921; report http://amzn.to/1vsZADi_robot",
"ua": {
"os_name": "Other",
"name": "Other",
"os": "Other",
"device": "Other",
"build": ""
},
"ms": "0ms",
"handle": "HTTP:HDL",
"total_usage": "10.5438/0000-00SS_robot",
"@timestamp": "2018-05-01T04:00:32.351Z",
"clientip": "54.252.254.236",
"lds": "1",
"ld": "1",
"@version": "1",
"path": "/usr/share/logstash/tests/fixtures/input46.log",
"server": "\"300:10.admin/codata\"",
"something": "\"\"",
"logdate": "2018-05-01",
"session": "2018-05-01_04_10.5438/0000-00SS_54.252.254.236_Amazon Route 53 Health Check Service; ref:f948b73a-405e-4f03-bfdd-81fb209f8921; report http://amzn.to/1vsZADi",
"occurred_at": "2018-05-01 00:00:32.351-0400",
"tags": ["_groked", "dated", "_aggregate_double_clicks", "_ua", "_access"],
"message": "54.252.254.236 HTTP:HDL \"2018-05-01 00:00:32.351-0400\" 1 1 0ms 10.5438/0000-00SS \"300:10.admin/codata\" \"\" \"Amazon Route 53 Health Check Service; ref:f948b73a-405e-4f03-bfdd-81fb209f8921; report http://amzn.to/1vsZADi\"",
"doi": "10.5438/0000-00SS",
"hour": "04",
"host": "e13f35402efa",
"several_clicks": true,
"access_method": "robot"
}
My logs are old logs, so I am using timeout_timestamp_field
to deal with that.
Currently, the pipeline seems to be stuck in the aggregation filter as none of the events go to the output. This is the Pipelines stats:
{
"id": "1013a9b725641926270c052b680b2140549849ec3e658ed848ae1acbed7699aa",
"events": {
"duration_in_millis": 58,
"in": 3,
"out": 3
},
"matches": 3,
"name": "date"
}
],
"outputs": [
{
"id": "f61cdbfe78b0561ddb290e49e06cca0bf061f2bf7bf8cd55f39d415860ae07f0",
"events": {
"duration_in_millis": 0,
"in": 0,
"out": 0
},
"name": "file"
}
]
},
"reloads": {
"last_error": null,
"successes": 1,
"last_success_timestamp": "2018-09-11T13:34:49.565Z",
"last_failure_timestamp": null,
"failures": 0
},
"queue": {
"type": "memory"
}
}
}
}
This is my configuration
input {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "events"
query => '{ "query": { "query_string": { "query": "*" } } }'
size => 500
scroll => "4m"
docinfo => true
# schedule => "* * * * *"
}
}
filter {
date {
match => [ "occurred_at", "yyyy-MM-dd HH:mm:ss.SSSZ", "ISO8601", "yyyy-MM-dd HH:mm:ss"]
add_tag => [ "_dated","solr" ]
}
aggregate {
timeout_timestamp_field => "@timestamp"
task_id => "%{unique_usage}"
push_map_as_event_on_timeout => true
timeout_task_id_field => "unique_usage"
timeout => 86400
inactivity_timeout => 3600
code => "
map['unique_investigations'] ||= 0;
map['unique_investigations'] += 1;
event_hash = event.to_hash
event.to_hash.each do |key,value|
map[key] = value unless map.has_key?(key)
end
"
timeout_tags => ['_unique_aggregated']
timeout_code => "event.set('investigations', event.get('unique_investigations') > 1);"
}
if "_unique_aggregated" not in [tags] {
drop { }
}
}
output {
file {
path => "/usr/share/logstash/tests/reponses/output.json"
}
}