Visualization to only process latest documents in a group

Greetings,

We're trying to create a performance dashboard for our batch processing jobs where we filter out documents that are no longer considered part of the latest run of a job.

To accomplish this, we have added a field "full_job_name" that for reruns will always have the same value. Our goal is now to aggregate another field, "duration_seconds" that represents how long the job has ran. We would first like to filter out documents which have this "duration_seconds"-field and also filter out the documents where @timestamp is the maximum for the group "full_job_name".

Starting from this base, we would then like to calculate performance measures like sum(duration_seconds), avg(duration_seconds), max(duration_seconds) based on other filters and fields in a Dashboard.

However, we seem to be stuck after creating a visualization for only showing the values we're interested in. We've used a top hit metrics aggregation for showing the maximum (Max 1) "duration_seconds"-field when the Table rows are split into buckets by using a Terms Aggregations on the field "full_job_name". Although this visualization shows the correct latest "duration_seconds" for each unique job-name, we can no longer further aggregate as we've used a Top Hit metric aggregation.

A query that would correspond with this visualization, give or take, would be this:

GET /ourindex/_search
{
  "query": {
    "bool": {
      "must": [

          {
          "match": {
            "tags": "DURATION_SECONDS"
          }
          }
        
      ]
    }
  },
"aggs": {
        "group": {
            "terms": {
                "field": "full_job_name.keyword"
            },
            "aggs": {
                "group_docs": {
                    "top_hits": {
                        "size": 1,
                        "sort": [
                            {
                                "@timestamp": {
                                    "order": "desc"
                                }
                            }
                        ]
                    }
                }
                }
          }
}
}

We've tried creating a discover for this query and base a visualization off that, but Kibana doesn't aggregate in discover searches unfortunately. Currently our last resort would be deleting all documents with a specific "full_job_name" before inserting new ones into Elasticsearch but this would not be preferred.

Hi Jamin,

It looks like this question fell through the cracks. Sorry about that! After thinking about it a bit, I couldn't come up with a good solution. But rather than deleting your documents, you could add a field is_latest or something like that.

Prior to inserting a new document, you'd update all other docs of that full_job_name to have is_latest: false, then you'd insert your new document, with is_latest: true.

That would allow you to do a simple stats aggregation over all docs where is_latest is true.

Just posting another followup... I reached out to the Elasticsearch folks, and they came up with this:

POST /test/_doc/_bulk
{ "index" : {} }
{ "user": 1, "timestamp": 0, "price": 0  }
{ "index" : {} }
{ "user": 2, "timestamp": 1, "price": 1 }
{ "index" : {} }
{ "user": 1, "timestamp": 2, "price": 2 }
{ "index" : {} }
{ "user": 2, "timestamp": 3, "price": 3 }
{ "index" : {} }
{ "user": 1, "timestamp": 4, "price": 4 }
{ "index" : {} }
{ "user": 2, "timestamp": 5, "price": 5 }
{ "index" : {} }
{ "user": 1, "timestamp": 6, "price": 6 }

GET /test/_search
{
  "size": 0,
  "aggs": {
    "user": {
      "terms": {
        "field": "user",
        "size": 10
      },
      "aggs": {
        "max_timestamp": {
          "terms": {
            "field": "timestamp",
            "size": 1,
            "order": {
              "_key": "desc"
            }
          },
          "aggs": {
            "max_price": {
              "max": {
                "field": "price"
              }
            }
          }
        },
        "max_price_proxy": {
          "max_bucket": {
            "buckets_path": "max_timestamp>max_price"
          }
        }
      }
    },
    "avg_of_user_price": {
      "avg_bucket": {
        "buckets_path": "user>max_price_proxy"
      }
    }
  }
}

I think the only part of Kibana that currently supports pipeline aggs is the Time Series Visual Builder (and maybe Vega), both of which are experimental, but might just do the trick!

Hey christopilus,

Thanks for the good followup and the great suggestions. We actually went ahead with a solution similar to your first one but also combined it with a clone of the events we're interested in.

The full "solution" in a simple explanation;

  1. Add a support tag "PERFORMANCE" or something similar to events you're interested in using for latest_run dashboards but also need to see in dashboards where all jobs need to be shown.

  2. For all events with the tag "PERFORMANCE", clone them in logstash (see clone plugin) and add another tag "LATEST_RUN" to the cloned events. Now you have duplicate events for all performance metrics.

  3. For events for which the tag "LATEST_RUN" is set, use a specific elasticsearch output in logstash that applies the following rules/settings:

  • Overwrite the document_id to a fixed value using a combination of other events that is guaranteed to be unique for one job on a specific rundate. Make sure to apply the setting that removes documents with the old document_id.
  • The index you're writing to for these events has to be the same for all possible runs and reruns of the same job. With other words: don't dynamically choose the index based on the rundate of the job or when the logs are parsed by logstash as documents with the same id on different indexes will not be updated.
  1. You now have all runs of the job in elasticsearch and events with the "PERFORMANCE" tag are duplicated. You now need to make a distinction for visualizations if the "LATEST_RUN" tag should be applied or not, (if forgotten duplicated events are shown in the visualization). Dashboards with the tag set now automatically update themselves to reflect the latest run.

This feature is handy if you have daily batch chains with a fixed amount of jobs (1000+), each with their own logging, but are prone to reruns due to data quality issues or whatever. This allows to compare the performance of the chains throughout multiple days and/or months without reruns skewing the view.

Thanks again for your help! I hope someone can be helped with this explanation.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.