I am working with the sample log dataset. Here is what it looks like:
"hits" : [
      {
        "_index" : "kibana_sample_data_logs",
        "_type" : "_doc",
        "_id" : "nfvc3X4BKbUZuD5M4wWJ",
        "_score" : 1.0,
        "_source" : {
          "agent" : "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)",
          "bytes" : 4155,
          "clientip" : "57.65.101.133",
          "host" : "artifacts.elastic.co",
          "index" : "kibana_sample_data_logs",
          "ip" : "57.65.101.133", 
          "message" : "57.65.101.133 - - [2018-07-25T14:13:30.450Z] \"GET /beats/filebeat/filebeat-6.3.2-linux-x86_64.tar.gz HTTP/1.1\" 200 4155 \"-\" \"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)\"",
          "phpmemory" : null,
          "timestamp" : "2022-02-02T14:13:30.450Z",
          "url" : "https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.3.2-linux-x86_64.tar.gz",
          "utc_time" : "2022-02-02T14:13:30.450Z",
          "event" : {
            "dataset" : "sample_web_logs"
          }
        }
      },
I want to find out the maximum number of bytes (max_bytes) transferred for documents bucketed by a date histogram. In other words for all document that falls into a bucket created by the date histogram, I want to find the max bytes using the map/combine/reduce scripted_metric aggregation.
Here is what I tried:
GET kibana_sample_data_logs/_search?size=0
{
  "query": {
    "match_all": {}
  },
  "aggs": {
    "doc_buckets_for_date_histogram": {
      "auto_date_histogram": {
        "field": "timestamp", 
        "buckets": 10
      }, 
      "aggs": {
        "max_bytes": {
          "scripted_metric": {
            "init_script": "state.max_bytes = 0L;", 
            "map_script": """ 
              def current_bytes = doc['bytes'].getValue();
              if (current_bytes > state.max_bytes)
                {state.max_bytes = current_bytes;}
            """,
            "combine_script": "return state", 
            "reduce_script": """ 
              def max_bytes = 0L;
              for (s in states) {if (s.max_bytes > (max_bytes))
                {max_bytes = s.max_bytes;}}
              return max_bytes
            """
          }
        }   
      }
    }
  }
}
But it is giving an error:
{
  "error" : {
    "root_cause" : [ ],
    "type" : "search_phase_execution_exception",
    "reason" : "",
    "phase" : "fetch",
    "grouped" : true,
    "failed_shards" : [ ],
    "caused_by" : {
      "type" : "script_exception",
      "reason" : "runtime error",
      "script_stack" : [
        "if (s.max_bytes > (max_bytes))\n                {",
        "     ^---- HERE"
      ],
      "script" : "  ...",
      "lang" : "painless",
      "position" : {
        "offset" : 74,
        "start" : 69,
        "end" : 117
      },
      "caused_by" : {
        "type" : "illegal_argument_exception",
        "reason" : "dynamic getter [java.lang.Long, max_bytes] not found"
      }
    }
  },
  "status" : 400
}
It seems to me that the reduce script is not getting the s.max_bytes argument. I dont know why not.
Please help.