Aggregation enrichment in Logstash with Elasticsearch filter

Hi, I'd like to make an aggregated value enrichment every time when new log entry generated. This is a performance value and I'd like to expand it with "average of this value in the past 4 weeks in this time period" value. It should be useful on visualization to present the excepted threshold of the performance. The following ES query works with fine in Sense and get the right average values:

{
  "aggs": {
    "mq_avg_4w_this_period": {
      "avg": {
        "field": "mq_last_minute"
      }
    },
    "ws_avg_4w_this_period": {
      "avg": {
        "field": "mq_ws_last_minute"
      }
    }
  },
  "query": {
    "filtered": {
      "filter": {
        "bool": {
          "should": [
            {
              "range": {
                "@timestamp": {
                  "gte": "now-10m",
                  "lte": "now"
                }
              }
            },
            {
              "range": {
                "@timestamp": {
                  "gte": "now-1w-10m",
                  "lte": "now-1w"
                }
              }
            },
            {
              "range": {
                "@timestamp": {
                  "gte": "now-2w-10m",
                  "lte": "now-2w"
                }
              }
            },
            {
              "range": {
                "@timestamp": {
                  "gte": "now-3w-10m",
                  "lte": "now-3w"
                }
              }
            }
          ]
        }
      }
    }
  },

}
But it can't work in logstash's elasticsearch filter:

elasticsearch {
  hosts => ["localhost"]
  index => "any-index-*"
  #query => '{"aggs":{"mq_avg_4w":{"avg":{"field":"mq_last_minute"}},"ws_avg_4w":{"avg":{"field":"mq_ws_last_minute"}}},"query":{"filtered":{"filter":{"bool":{"should":[{"range":{"@timestamp":{"gte":"now-10m","lte":"
now"}}},{"range":{"@timestamp":{"gte":"now-1w-10m","lte":"now-1w"}}},{"range":{"@timestamp":{"gte":"now-2w-10m","lte":"now-2w"}}},{"range":{"@timestamp":{"gte":"now-3w-10m","lte":"now-3w"}}}]}}}}}'
  fields => [["mq_avg_4w_this_period","mq_avg_4w"],["ws_avg_4w_this_period","ws_avg_4w"]]
  enable_sort => "false"
}

Logstash debug log result:

{:timestamp=>"2016-12-16T16:58:03.672000+0100", :message=>"Failed to query elasticsearch for previous event", :index=>"any-index-2016.12.16", :query=>"{[...]}", :event=>#<LogStash::Event:0x7dbc4b7 @metadata_accessors=#<LogStash::Util::Accessors:0x4d192a58 @store={}, @lut={}>, @cancelled=false, @data={"message"=>"0C7:07:XXXX_LOAD_BY_SERVICE:1;201612161658;OK;4966736:3267:14362230:7609", "@version"=>"1", "@timestamp"=>"2016-12-16T15:58:02.617Z", "path"=>"/opt/logs/xxx/xxx_log.txt", "host"=>"xxx.xx", "type"=>"xxx-log", "spoccode"=>"0C7", "env"=>"07", "xxxx_service"=>"XXXX_LOAD_BY_SERVICE", "version"=>1, "xxx_date"=>"201612161658", "YEAR"=>"2016", "MONTHNUM"=>"1", "MONTHDAY"=>"21", "HOUR"=>"6", "MINUTE"=>"16", "SECOND"=>"58", "status"=>"OK", "mq_total"=>4966736, "mq_last_minute"=>3267, "mq_ws_total"=>14362230, "mq_ws_last_minute"=>7609, "mq_avg_4w"=>"1", "ws_avg_4w"=>"1"}, @metadata={}, @accessors=#<LogStash::Util::Accessors:0x17678c9e @store={"message"=>"0C7:07:XXXX_LOAD_BY_SERVICE:1;201612161658;OK;4966736:3267:14362230:7609", "@version"=>"1", "@timestamp"=>"2016-12-16T15:58:02.617Z", "path"=>"/opt/logs/xxx/xxx_log.txt", "host"=>"xxx.xx", "type"=>"xxx-log", "spoccode"=>"0C7", "env"=>"07", "xxxx_service"=>"XXXX_LOAD_BY_SERVICE", "version"=>1, "xxx_date"=>"201612161658", "YEAR"=>"2016", "MONTHNUM"=>"1", "MONTHDAY"=>"21", "HOUR"=>"6", "MINUTE"=>"16", "SECOND"=>"58", "status"=>"OK", "mq_total"=>4966736, "mq_last_minute"=>3267, "mq_ws_total"=>14362230, "mq_ws_last_minute"=>7609, "mq_avg_4w"=>"1", "ws_avg_4w"=>"1"}, @lut={"ws_avg_4w"=>[{"message"=>"0C7:07:XXXX_LOAD_BY_SERVICE:1;201612161658;OK;4966736:3267:14362230:7609", "@version"=>"1", "@timestamp"=>"2016-12-16T15:58:02.617Z", "path"=>"/opt/logs/xxx/xxxx_log.txt", "host"=>"xxx.xx", "type"=>"xxx-log", "spoccode"=>"0C7", "env"=>"07", "xxxx_service"=>"XXXX_LOAD_BY_SERVICE", "version"=>1, "xxx_date"=>"201612161658", "YEAR"=>"2016", "MONTHNUM"=>"1", "MONTHDAY"=>"21", "HOUR"=>"6", "MINUTE"=>"16", "SECOND"=>"58", "status"=>"OK", "mq_total"=>4966736, "mq_last_minute"=>3267, "mq_ws_total"=>14362230, "mq_ws_last_minute"=>7609}, "ws_avg_4w"]}>>, :error=>#<Elasticsearch::Transport::Transport::Errors::BadRequest: [400] {"error":{"root_cause":[{"type":"query_parsing_exception","reason":"Failed to parse query [{...}]","index":"any-index-2016.12.16"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query_fetch","grouped":true,"failed_shards":[{"shard":0,"index":"any-index-2016.12.16","node":"VFFt45tgScaYhFFJCAk_qg","reason":{"type":"query_parsing_exception","reason":"Failed to parse query [{...}]","index":"any-index-2016.12.16","caused_by":{"type":"parse_exception","reason":"Cannot parse '{[...]}': Encountered " <RANGE_GOOP> "\"test1\": "" at line 1, column 19.\nWas expecting one of:\n "]" ...\n "}" ...\n ","caused_by":{"type":"parse_exception","reason":"Encountered \" <RANGE_GOOP> \"\\\"test1\\\": \"\" at line 1, column 19.\nWas expecting one of:\n \"]\" ...\n \"}\" ...\n "}}}}]},"status":400}>, :level=>:warn}

Relevant part (maybe): {"type":"parse_exception","reason":"Encountered " <RANGE_GOOP> "\"test1\": "" at line 1, column 19.\nWas expecting one of:\n "]" ...\n "}" ...\n "}}}}]},"status":400}>, :level=>:warn}"

Ok, I guess I found the problem. As I understand normally the elasticsearch filter plugin only support the short query syntax, no the full ES query DSL. Hopefully this feature is developed and under releasing and it probably will be present in next versions of logstash's plugin library: Support for Full Elasticsearch DSL

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.