Hi,
I have been looking for a good way to aggregate metrics data and terms counts for various index data I have been collecting. I'm mostly looking at infrastructure component performance and throughput metrics.
I have indices for message queuing stats, application response times, etc...
I like that Elastic provides the Aggregation query, I have used the "date_histogram" combined with either terms or a stats bucket. In the end I would like an aggregate of my data so that I can keep it long term, maybe aggregated every day. Even producing an aggregate every 15 minutes would allow me to visualize a weeks worth of data. Currently my data is so granular that I'm lucky if Kibana can render 2 days.
- Aggregation script - python using elasticsearch-dsl, I pass this script the index name, fields to aggregate on and the time to aggregate on. This script outputs a new index that is an aggregate. If it's a 24 hour aggregate, it's not much data so I put it in a aggregate index with a suffix of the current year. If it's a 15 minute aggregate I put a date suffix of YYYY.MM.DD. I have named my aggregates like this, aggs-qstats-queue-15m-2017.10.24; index=qstats, byname=queue, aggtime=15m. That's the idea, and it works for this data set.
- Curator script - Curator deletes the aggregate data by the date suffix. YYYY is kept forever, YYYY.MM is kept for 12 months, YYYY.MM.DD is kept for 1 month.
- Jenkins job - I have a jenkins job that runs a shell script executing the aggregation in the above queue example, 3 times (15m, 1h, 24h)
- Index Aliases - After the aggregate indices are created I create a search index alias that glues the resulting aggregates together so you can look at the data in Kibana seemlessly between the current data and the aggregate.
This all works, but it feels like a very fragile solution. I feel like there should be a better way to do this or support aggregations. I think Machine Learning already does it's own aggregates before it analyzes the data. But could those aggregates from ML be exposed for normal viewing.
I'm looking for feedback, this is just the first iteration of this process, but I think it can be better. Is anyone else solving this problem? Elastic, do you have any plans to make aggregations of data more seamless?
I have attached some sample scripts, if anyone is interested.
Thanks,
Tim
#!/usr/bin/python
# Script: callresults_aggregation.py
# Purpose: Aggregates data in callresults index
# Author: Tim Arp
from __future__ import print_function
from __future__ import division
from elasticsearch import Elasticsearch,helpers
from elasticsearch_dsl import Search
from django.db.models import Q
import argparse
import requests
from datetime import datetime, timedelta
import dateparser
import time
import sys
import re
def parse_arguments():
parser = argparse.ArgumentParser(description='Does a terms level aggregation, single level')
parser.add_argument("--endpoint", help="Endpoint for your ES domain", action="store", required=True)
parser.add_argument("--index", help="index string to search for, overrides the indexprefix", action="store")
parser.add_argument("--indexprefix", help="first part of the index minus the date", action="store")
parser.add_argument("--byname", help="string to name the group part of aggregate name", action="store")
parser.add_argument("--byfield", help="string of field to group by", action="store")
parser.add_argument("--byfieldsize", help="upper limit of unique byfields", action="store")
parser.add_argument("--aggtime", help="string of aggregation time, 60m, 1d, etc", action="store")
parser.add_argument("--aggname", help="string of aggregation name, aggs-theagname", action="store")
parser.add_argument("--monthly", help="Indicate to add the month suffix to index name", default=False, action="store_true")
parser.add_argument("--daily", help="Indicate to add the daily suffix to index name", default=False, action="store_true")
return parser.parse_args()
args = parse_arguments()
args = parse_arguments()
for each_arg in vars(args):
print (each_arg, ':', getattr(args, each_arg))
##### Variable Declarations
esEndpoint = "http://" + args.endpoint + "/"
if args.index:
queryIndex = args.index
else:
yesterday = datetime.now() - timedelta(days=1)
queryIndex = args.indexprefix + yesterday.strftime('%Y.%m.%d')
index_prefix = 'aggs-' + args.aggname + '-' + args.byname + '-' + args.aggtime + '-'
index_type = 'aggs-' + args.aggname + '-' + args.byname
responseCode = 0
print ('esEndpoint=', esEndpoint)
print ('queryIndex=', queryIndex)
print ('Starting...')
client = Elasticsearch([esEndpoint], timeout=120)
metric_bulk_data = []
print('Aggregation every ' + args.aggtime)
print('Source indices:' + queryIndex)
s = Search(using=client, index=queryIndex)
s.aggs.bucket(args.aggtime, 'date_histogram', field='@timestamp', interval=args.aggtime)\
.metric('things', 'terms', field=args.byfield, size=args.byfieldsize)
response = s.execute()
for theagg in response.aggregations:
for thebucket in theagg.buckets:
print('Aggregating bucket ' + thebucket.key_as_string)
if args.monthly:
if args.daily:
index_name = index_prefix + dateparser.parse(thebucket.key_as_string).strftime('%Y.%m.%d')
else:
index_name = index_prefix + dateparser.parse(thebucket.key_as_string).strftime('%Y.%m')
else:
index_name = index_prefix + dateparser.parse(thebucket.key_as_string).strftime('%Y')
# print ('Agg index name:', index_name)
print ('Agg index name:', index_name)
for thisthing in thebucket.things.buckets:
# print('thedevice', thisnode.key)
data_dict = {}
data_dict['_id'] = str(thisthing.key) + '-' + str(thebucket.key)
data_dict['_op_type'] = 'index'
data_dict['_index'] = index_name
data_dict['_type'] = index_type
data_dict['@timestamp'] = thebucket.key_as_string
data_dict[args.byname] = thisthing.key
data_dict['count'] = thisthing.doc_count
metric_bulk_data.append(data_dict)
res = helpers.bulk(client, metric_bulk_data, True)
if res[0]:
print('Successfully indexed ', res[0], ' documents')
else:
print('Failed to index ', res[1], ' documents')
responseCode += 1
metric_bulk_data = []
print ('Done')
sys.exit(responseCode)