Aggregate Filter does not work with OLD log files


(Kristian Garza) #1

I am trying to aggregate events over a DAY period but discarding any event after one hour of inactivity.

the events in ElasticSearch look like this:

{
  "clicks": 6,
  "unique_usage": "2018-05-01_04_10.5438/0000-00SS_54.252.254.236_Amazon Route 53 Health Check Service; ref:f948b73a-405e-4f03-bfdd-81fb209f8921; report http://amzn.to/1vsZADi_robot",
  "ua": {
    "os_name": "Other",
    "name": "Other",
    "os": "Other",
    "device": "Other",
    "build": ""
  },
  "ms": "0ms",
  "handle": "HTTP:HDL",
  "total_usage": "10.5438/0000-00SS_robot",
  "@timestamp": "2018-05-01T04:00:32.351Z",
  "clientip": "54.252.254.236",
  "lds": "1",
  "ld": "1",
  "@version": "1",
  "path": "/usr/share/logstash/tests/fixtures/input46.log",
  "server": "\"300:10.admin/codata\"",
  "something": "\"\"",
  "logdate": "2018-05-01",
  "session": "2018-05-01_04_10.5438/0000-00SS_54.252.254.236_Amazon Route 53 Health Check Service; ref:f948b73a-405e-4f03-bfdd-81fb209f8921; report http://amzn.to/1vsZADi",
  "occurred_at": "2018-05-01 00:00:32.351-0400",
  "tags": ["_groked", "dated", "_aggregate_double_clicks", "_ua", "_access"],
  "message": "54.252.254.236 HTTP:HDL \"2018-05-01 00:00:32.351-0400\" 1 1 0ms 10.5438/0000-00SS \"300:10.admin/codata\" \"\" \"Amazon Route 53 Health Check Service; ref:f948b73a-405e-4f03-bfdd-81fb209f8921; report http://amzn.to/1vsZADi\"",
  "doi": "10.5438/0000-00SS",
  "hour": "04",
  "host": "e13f35402efa",
  "several_clicks": true,
  "access_method": "robot"
}

My logs are old logs, so I am using timeout_timestamp_field to deal with that.

Currently, the pipeline seems to be stuck in the aggregation filter as none of the events go to the output. This is the Pipelines stats:

					{
						"id": "1013a9b725641926270c052b680b2140549849ec3e658ed848ae1acbed7699aa",
						"events": {
							"duration_in_millis": 58,
							"in": 3,
							"out": 3
						},
						"matches": 3,
						"name": "date"
					}
				],
				"outputs": [
					{
						"id": "f61cdbfe78b0561ddb290e49e06cca0bf061f2bf7bf8cd55f39d415860ae07f0",
						"events": {
							"duration_in_millis": 0,
							"in": 0,
							"out": 0
						},
						"name": "file"
					}
				]
			},
			"reloads": {
				"last_error": null,
				"successes": 1,
				"last_success_timestamp": "2018-09-11T13:34:49.565Z",
				"last_failure_timestamp": null,
				"failures": 0
			},
			"queue": {
				"type": "memory"
			}
		}
	}
}

This is my configuration

input {
	elasticsearch { 
		hosts => ["elasticsearch:9200"] 
		index => "events" 
query => '{ "query": { "query_string": { "query": "*" } } }'
size => 500
scroll => "4m"
docinfo => true
# schedule => "* * * * *"
	}
}


filter {

  date {
match => [ "occurred_at", "yyyy-MM-dd HH:mm:ss.SSSZ", "ISO8601", "yyyy-MM-dd HH:mm:ss"]
add_tag => [ "_dated","solr" ]
  }


  aggregate {
timeout_timestamp_field => "@timestamp"
task_id => "%{unique_usage}"
push_map_as_event_on_timeout => true
timeout_task_id_field => "unique_usage"
timeout => 86400
inactivity_timeout => 3600
code => "
  map['unique_investigations'] ||= 0; 
  map['unique_investigations'] += 1;
  event_hash = event.to_hash
  event.to_hash.each do |key,value|
      map[key] = value unless map.has_key?(key)
  end
"
timeout_tags => ['_unique_aggregated']
timeout_code => "event.set('investigations', event.get('unique_investigations') > 1);"
  }

  if "_unique_aggregated" not in [tags] {
drop { } 
  }
}

output {
	file {
		path => "/usr/share/logstash/tests/reponses/output.json"
	}
}

(system) #2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.