My aggregate indices version 1, thoughts?

Hi,

I have been looking for a good way to aggregate metrics data and terms counts for various index data I have been collecting. I'm mostly looking at infrastructure component performance and throughput metrics.
I have indices for message queuing stats, application response times, etc...

I like that Elastic provides the Aggregation query, I have used the "date_histogram" combined with either terms or a stats bucket. In the end I would like an aggregate of my data so that I can keep it long term, maybe aggregated every day. Even producing an aggregate every 15 minutes would allow me to visualize a weeks worth of data. Currently my data is so granular that I'm lucky if Kibana can render 2 days.

  • Aggregation script - python using elasticsearch-dsl, I pass this script the index name, fields to aggregate on and the time to aggregate on. This script outputs a new index that is an aggregate. If it's a 24 hour aggregate, it's not much data so I put it in a aggregate index with a suffix of the current year. If it's a 15 minute aggregate I put a date suffix of YYYY.MM.DD. I have named my aggregates like this, aggs-qstats-queue-15m-2017.10.24; index=qstats, byname=queue, aggtime=15m. That's the idea, and it works for this data set.
  • Curator script - Curator deletes the aggregate data by the date suffix. YYYY is kept forever, YYYY.MM is kept for 12 months, YYYY.MM.DD is kept for 1 month.
  • Jenkins job - I have a jenkins job that runs a shell script executing the aggregation in the above queue example, 3 times (15m, 1h, 24h)
  • Index Aliases - After the aggregate indices are created I create a search index alias that glues the resulting aggregates together so you can look at the data in Kibana seemlessly between the current data and the aggregate.

This all works, but it feels like a very fragile solution. I feel like there should be a better way to do this or support aggregations. I think Machine Learning already does it's own aggregates before it analyzes the data. But could those aggregates from ML be exposed for normal viewing.

I'm looking for feedback, this is just the first iteration of this process, but I think it can be better. Is anyone else solving this problem? Elastic, do you have any plans to make aggregations of data more seamless?
I have attached some sample scripts, if anyone is interested.
Thanks,
Tim

#!/usr/bin/python
# Script: callresults_aggregation.py
# Purpose: Aggregates data in callresults index
# Author: Tim Arp

from __future__ import print_function
from __future__ import division
from elasticsearch import Elasticsearch,helpers
from elasticsearch_dsl import Search
from django.db.models import Q
import argparse
import requests
from datetime import datetime, timedelta
import dateparser
import time
import sys
import re

def parse_arguments():
    parser = argparse.ArgumentParser(description='Does a terms level aggregation, single level')
    parser.add_argument("--endpoint", help="Endpoint for your ES domain", action="store", required=True)
    parser.add_argument("--index", help="index string to search for, overrides the indexprefix", action="store")
    parser.add_argument("--indexprefix", help="first part of the index minus the date", action="store")
    parser.add_argument("--byname", help="string to name the group part of aggregate name", action="store")
    parser.add_argument("--byfield", help="string of field to group by", action="store")
    parser.add_argument("--byfieldsize", help="upper limit of unique byfields", action="store")
    parser.add_argument("--aggtime", help="string of aggregation time, 60m, 1d, etc", action="store")
    parser.add_argument("--aggname", help="string of aggregation name, aggs-theagname", action="store")
    parser.add_argument("--monthly", help="Indicate to add the month suffix to index name", default=False, action="store_true")
    parser.add_argument("--daily", help="Indicate to add the daily suffix to index name", default=False, action="store_true")

    return parser.parse_args()

args = parse_arguments()

args = parse_arguments()
for each_arg in vars(args):
    print (each_arg, ':', getattr(args, each_arg))

##### Variable Declarations
esEndpoint = "http://" + args.endpoint + "/"

if args.index:
    queryIndex = args.index
else:
    yesterday = datetime.now() - timedelta(days=1)
    queryIndex = args.indexprefix + yesterday.strftime('%Y.%m.%d')

index_prefix = 'aggs-' + args.aggname + '-' + args.byname + '-' + args.aggtime + '-'
index_type = 'aggs-' + args.aggname + '-' + args.byname
responseCode = 0
print ('esEndpoint=', esEndpoint)
print ('queryIndex=', queryIndex)
print ('Starting...')

client = Elasticsearch([esEndpoint], timeout=120)
metric_bulk_data = []

print('Aggregation every ' + args.aggtime)
print('Source indices:' + queryIndex)
s = Search(using=client, index=queryIndex)
s.aggs.bucket(args.aggtime, 'date_histogram', field='@timestamp', interval=args.aggtime)\
    .metric('things', 'terms', field=args.byfield, size=args.byfieldsize)

response = s.execute()

for theagg in response.aggregations:
    for thebucket in theagg.buckets:
        print('Aggregating bucket ' + thebucket.key_as_string)
        if args.monthly:
            if args.daily:
                index_name = index_prefix + dateparser.parse(thebucket.key_as_string).strftime('%Y.%m.%d')
            else:
                index_name = index_prefix + dateparser.parse(thebucket.key_as_string).strftime('%Y.%m')
        else:
            index_name = index_prefix + dateparser.parse(thebucket.key_as_string).strftime('%Y')
            # print ('Agg index name:', index_name)
        print ('Agg index name:', index_name)
        for thisthing in thebucket.things.buckets:
            # print('thedevice', thisnode.key)
            data_dict = {}
            data_dict['_id'] = str(thisthing.key) + '-' + str(thebucket.key)
            data_dict['_op_type'] = 'index'
            data_dict['_index'] = index_name
            data_dict['_type'] = index_type
            data_dict['@timestamp'] = thebucket.key_as_string
            data_dict[args.byname] = thisthing.key
            data_dict['count'] = thisthing.doc_count
            metric_bulk_data.append(data_dict)

        res = helpers.bulk(client, metric_bulk_data, True)
        if res[0]:
            print('Successfully indexed ', res[0], ' documents')
        else:
            print('Failed to index ', res[1], ' documents')
            responseCode += 1
        metric_bulk_data = []
print ('Done')
sys.exit(responseCode)
#!/bin/bash
Activemq
python ./scripts/elastic/agg_qstats.py --endpoint theelasticsearchserver.com:9200 --byname queue --aggtime 15m --byfield queue.keyword --byfieldsize 10000 --monthly --daily --mqtype activemq
retVal=$?
if [ ! $? -eq 0 ]; then
    echo "Error with 15m aggregation $retVal"; exit $retVal
fi
python ./scripts/elastic/agg_qstats.py --endpoint theelasticsearchserver.com:9200 --byname queue --aggtime 1h --byfield queue.keyword --byfieldsize 10000 --monthly --mqtype activemq
retVal=$?
echo "retVal: $retVal"
if [ ! $? -eq 0 ]; then
    echo "Error with 1h aggregation $retVal"; exit $retVal
fi
python ./scripts/elastic/agg_qstats.py --endpoint theelasticsearchserver.com:9200 --byname queue --aggtime 24h --byfield queue.keyword --byfieldsize 10000 --mqtype activemq
retVal=$?
if [ ! $? -eq 0 ]; then
    echo "Error with 24h aggregation $retVal"; exit $retVal
fi
#IBM MQ
python ./scripts/elastic/agg_qstats.py --endpoint theelasticsearchserver.com:9200 --byname queue --aggtime 15m --byfield queue.keyword --byfieldsize 10000 --monthly --daily --mqtype ibmmq
retVal=$?
if [ ! $? -eq 0 ]; then
    echo "Error with 15m aggregation $retVal"; exit $retVal
fi
python ./scripts/elastic/agg_qstats.py --endpoint theelasticsearchserver.com:9200 --byname queue --aggtime 1h --byfield queue.keyword --byfieldsize 10000 --monthly --mqtype ibmmq
retVal=$?
echo "retVal: $retVal"
if [ ! $? -eq 0 ]; then
    echo "Error with 1h aggregation $retVal"; exit $retVal
fi
python ./scripts/elastic/agg_qstats.py --endpoint theelasticsearchserver.com:9200 --byname queue --aggtime 24h --byfield queue.keyword --byfieldsize 10000 --mqtype ibmmq
retVal=$?
if [ ! $? -eq 0 ]; then
    echo "Error with 24h aggregation $retVal"; exit $retVal
fi

YESTERDAY=`date -d "yesterday 13:00" '+%Y.%m.%d'`
TODAY=`date '+%Y.%m.%d'`
curl -XPOST 'theelasticsearchserver.com:9200/_aliases' -d "{
  \"actions\": [
    { \"remove\": { \"index\": \"logstash-activemq-qstats-*\", \"alias\" : \"qstats-search\" }},
    { \"add\": { \"index\": \"aggs-qstats-queue-1h-*\", \"alias\" : \"qstats-search\" }},
    { \"add\": { \"index\": \"aggs-qstats-queue-24h-*\", \"alias\" : \"qstats-search\" }},
    { \"add\": { \"index\": \"aggs-qstats-queue-15m-*\", \"alias\" : \"qstats-search\" }},
    { \"add\": { \"index\": \"logstash-mq-$YESTERDAY\", \"alias\" : \"qstats-search\" }},
    { \"add\": { \"index\": \"logstash-mq-$TODAY\", \"alias\" : \"qstats-search\" }},
    { \"add\": { \"index\": \"logstash-activemq-qstats-$YESTERDAY\", \"alias\" : \"qstats-search\" }},
    { \"add\": { \"index\": \"logstash-activemq-qstats-$TODAY\", \"alias\" : \"qstats-search\" }}
  ]
}"

Hey! Yeah, you generally have the right idea. We agree it's fairly fragile to take this approach, and we've been working on something that could help situations like this. We're actively seeking feedback on some of the ideas we have. If you would like to chat about the specifics of your use case, so we can consider it as we develop our see, feel free to DM me and we can chat!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.