How to get most recent reading from two different documents in an index/machine

I have a machine that sends me temperature metrics that I'm pulling from two different files on that machine with filebeat. I'm trying to get some sort of current status of all the metrics of the machine in a query, and having a hard time because I need to get it from two different documents.

For example:

PUT machine/_doc/1
{
"temp1": 12,
"@timestamp": 1596123915687
}

PUT machine/_doc/2
{
"pressure1": 3,
"@timestamp": 1595123815687 
}

PUT machine/_doc/3
{
"temp1": 10,
"@timestamp": 1596023915687 
}

PUT machine/_doc/4
{
"pressure1": 1,
"@timestamp": 1595023815687 
}

The most recent temp1 is from document 1 and the most recent pressure1 is from doc 2.
I want in a single query to get the most recent pressure and document for a given machine. Top hits aggregation works for a single document but not over different documents. I also debated transforms but they also have a hard time pulling over multiple documents

Any help?

I imagine you could use a filter aggregation with an exists query for the relevant fields to segregate the two doc types?

Hi Mark, thanks for the suggestion.

Are you then saying to do a top hits under each of those filter aggregations?

This is what I came up with but it's just quite bulky because I actually have 3-5 different documents with values I need to pull from, and I'll be running this query every 5 mins or so (across 1000 different machines).

GET machine/_search
{
  "size": 0,
  "aggs": {
    "temp": {
      "filter": {
        "exists": {
          "field": "temp1"
        }
      },
      "aggs": {
        "LatestValue": {
          "top_hits": {
            "size": 1,
            "_source": [
              "temp1",
              "@timestamp"
            ],
            "sort": [
              {
                "@timestamp": {
                  "order": "desc"
                }
              }
            ]
          }
        }
      }
    },
    "pressure": {
      "filter": {
        "exists": {
          "field": "pressure1"
        }
      },
      "aggs": {
        "LatestValue": {
          "top_hits": {
            "size": 1,
            "_source": [
              "pressure1",
              "@timestamp"
            ],
            "sort": [
              {
                "@timestamp": {
                  "order": "desc"
                }
              }
            ]
          }
        }
      }
    }
  }
}

I was playing with creating a transform for the different values, but I can't get the filter/exists mechanism to work with it. Any painless suggestions? The below only pulls whichever of the 2 documents most recently came, not both. This is from https://www.elastic.co/guide/en/elasticsearch/reference/7.8/transform-painless-examples.html#painless-top-hits

{
  "group_by": {
    "SystemSerialNumber.keyword": {
      "terms": {
        "field": "SystemSerialNumber.keyword"
      }
    }
  },
  "aggregations": {
    "latest_value": {
      "scripted_metric": {
        "init_script": "state.timestamp_latest = 0L; state.last_value = ''",
        "map_script": "def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli(); if (current_date > state.timestamp_latest) {state.timestamp_latest = current_date; state.last_value = params['_source']['pressure1'];} ",
        "combine_script": "return state",
        "reduce_script": "def last_value = ''; def timestamp_latest = 0L; for (s in states) {if (s.timestamp_latest > (timestamp_latest)) {timestamp_latest = s.timestamp_latest; last_value = s.last_value;}} return last_value "
      }
    },
    "latest_value2": {
      "scripted_metric": {
        "init_script": "state.timestamp_latest = 0L; state.last_value = ''",
        "map_script": "def current_date2 = doc['@timestamp'].getValue().toInstant().toEpochMilli(); if (current_date2 > state.timestamp_latest2) {state.timestamp_latest2 = current_date2; state.last_value2 = params['_source']['temp1'];} ",
        "combine_script": "return state",
        "reduce_script": "def last_value2 = ''; def timestamp_latest2 = 0L; for (s in states) {if (s.timestamp_latest2 > (timestamp_latest2)) {timestamp_latest2 = s.timestamp_latest2; last_value2 = s.last_value2;}} return last_value2 "
      }
    }
  }
}

Yep - your example looks good. Bulky, admittedly but I'm not sure there's much we can do to improve that.

From a performance point of view, adding a range query >= the last-observed timestamp will help avoid replaying all history.

Your scripts does not check if the field exists, I think you need to add an if statement:

      "scripted_metric": {
        "init_script": "state.timestamp_latest = 0L; state.last_value = ''",
        "map_script": "def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli(); if (params['_source']['pressure1'] != null && current_date > state.timestamp_latest) {state.timestamp_latest = current_date; state.last_value = params['_source']['pressure1'];} ",
        "combine_script": "return state",
        "reduce_script": "def last_value = ''; def timestamp_latest = 0L; for (s in states) {if (s.timestamp_latest > (timestamp_latest)) {timestamp_latest = s.timestamp_latest; last_value = s.last_value;}} return last_value "
      }
    },

Alternatively you can put a a filter aggregation in front (requires >= 7.7) like you did in post 3.

I second the suggestion with the range filter. With your current config the transform would retrieve all historical values every 5 minutes, which will lead to bad performance. Pragmatic solution: use now-10m.

Or you add a group_by with a date_histogram and e.g. fixed_interval: 5m, this way you get the last last state and historical values bucketed by 5 minutes (or any other interval you like). Transform (>=7.7) will internally use a range filter in this case, so you do not have to add it to the query yourself.

I forgot to respond. It works great, thanks!!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.