Hi, I am trying to use logstash to make a 1 minute aggregation, when I run the query I've made in postman, I get expected result:
{
"took": 14,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1000,
"relation": "eq"
},
"max_score": null,
"hits": []
},
"aggregations": {
"by_minute": {
"buckets": [
{
"key_as_string": "2024-07-25T02:11:00.000Z",
"key": 1721873460000,
"doc_count": 446,
"by_user": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "bef2021a-49f7-4ddb-ad12-b8269f145d8d",
"doc_count": 446,
"by_method": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "eth_getCode",
"doc_count": 223,
"by_path": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "/56ed89eb6b014c1db058e15d675f5fec",
"doc_count": 223,
"by_code": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": 200,
"doc_count": 221,
"response_time_avg": {
"value": 0.48868778280542985
},
"count": {
"value": 221
}
},
{
"key": 500,
"doc_count": 2,
"response_time_avg": {
"value": null
},
"count": {
"value": 2
}
}
]
}
}
]
}
},
{
"key": "eth_getCode_archive",
"doc_count": 223,
"by_path": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "/56ed89eb6b014c1db058e15d675f5fec",
"doc_count": 223,
"by_code": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": 200,
"doc_count": 222,
"response_time_avg": {
"value": 0.26576576576576577
},
"count": {
"value": 222
}
},
{
"key": 500,
"doc_count": 1,
"response_time_avg": {
"value": null
},
"count": {
"value": 1
}
}
]
}
}
]
}
}
]
}
}
]
}
},
{
"key_as_string": "2024-07-25T02:12:00.000Z",
"key": 1721873520000,
"doc_count": 554,
"by_user": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "bef2021a-49f7-4ddb-ad12-b8269f145d8d",
"doc_count": 554,
"by_method": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "eth_getCode",
"doc_count": 277,
"by_path": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "/56ed89eb6b014c1db058e15d675f5fec",
"doc_count": 277,
"by_code": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": 200,
"doc_count": 277,
"response_time_avg": {
"value": 0.12274368231046931
},
"count": {
"value": 277
}
}
]
}
}
]
}
},
{
"key": "eth_getCode_archive",
"doc_count": 277,
"by_path": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "/56ed89eb6b014c1db058e15d675f5fec",
"doc_count": 277,
"by_code": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": 200,
"doc_count": 277,
"response_time_avg": {
"value": 0.07220216606498195
},
"count": {
"value": 277
}
}
]
}
}
]
}
}
]
}
}
]
}
}
]
}
}
}
Logtash output when this runs:
[2024-07-27T14:46:01,719][WARN ][logstash.filters.ruby ][one_minute_aggregation][c225f4ab7d3fb56635313da671a2087104869f614100340b6542306665f90a95] No aggregation result found in event: {"request_id"=>"8c5564ed-66e6-445c-a3a1-13f289772338", "response_flag"=>"-", "request_method"=>"eth_getCode_archive", "response_time"=>"0", "client_ip"=>"192.168.65.1", "method"=>"POST", "response_code"=>"200", "request_path"=>"/56ed89eb6b014c1db058e15d675f5fec", "@timestamp"=>2024-07-25T02:11:45.909Z, "http_version"=>"1.1", "authority"=>"localhost:8000", "upstream_host"=>"172.18.0.7:8545", "user_agent"=>"Artillery (https://artillery.io)", "@version"=>"1", "user_account"=>"bef2021a-49f7-4ddb-ad12-b8269f145d8d"}
Logstash config:
input {
elasticsearch {
hosts => ["https://localhost:9200"]
user => "elastic"
password => "hc9UKl-TmqWFf9fmAi4f"
ssl_enabled => true
ssl_certificate_verification => true
ssl_certificate_authorities => ["/Users/liamyoung/Documents/GitHub/Immutibull/elastic-stuff/logstash-8.13.2/http_ca.crt"]
index => "logs-raw_access-immutabull"
schedule => "*/1 * * * *" # Run every minute
query => '{
"size": 0,
"aggs": {
"by_minute": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1m"
},
"aggs": {
"by_user": {
"terms": {
"field": "user_account"
},
"aggs": {
"by_method": {
"terms": {
"field": "request_method"
},
"aggs": {
"by_path": {
"terms": {
"field": "request_path"
},
"aggs": {
"by_code": {
"terms": {
"field": "response_code"
},
"aggs": {
"response_time_avg": {
"avg": {
"field": "response_time"
}
},
"count": {
"value_count": {
"field": "response_code"
}
}
}
}
}
}
}
}
}
}
}
}
}
}'
}
}
filter {
ruby {
code => '
begin
agg_result = event.get("[aggregations][by_minute][buckets]")
if agg_result
agg_result.each do |minute_bucket|
minute_key = minute_bucket["key_as_string"]
minute_bucket["by_user"]["buckets"].each do |user_bucket|
user_key = user_bucket["key"]
user_bucket["by_method"]["buckets"].each do |method_bucket|
method_key = method_bucket["key"]
method_bucket["by_path"]["buckets"].each do |path_bucket|
path_key = path_bucket["key"]
path_bucket["by_code"]["buckets"].each do |code_bucket|
code_key = code_bucket["key"]
count = code_bucket["count"]["value"]
response_time_avg = code_bucket["response_time_avg"]["value"]
new_event = event.clone
new_event.set("@timestamp", minute_key)
new_event.set("user_account", user_key)
new_event.set("request_method", method_key)
new_event.set("request_path", path_key)
new_event.set("response_code", code_key)
new_event.set("count", count)
new_event.set("response_time_average", response_time_avg)
# Add the new event to the Logstash pipeline
yield new_event
end
end
end
end
end
else
logger.warn("No aggregation result found in event: #{event.to_hash}")
end
rescue => e
logger.error("Failed to process event: #{event.to_hash}", :exception => e.message, :stacktrace => e.backtrace)
end
event.cancel # Cancel the original event
'
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["https://localhost:9200"]
user => "elastic"
password => "hc9UKl-TmqWFf9fmAi4f"
ssl_enabled => true
ssl_certificate_verification => true
ssl_certificate_authorities => ["/Users/liamyoung/Documents/GitHub/Immutibull/elastic-stuff/logstash-8.13.2/http_ca.crt"]
index => "logs-1m-%{+YYYY.MM.dd}"
}
}
This means it is failing at
agg_result = event.get("[aggregations][by_minute][buckets]")
if agg_result
Causing the print:
else
logger.warn("No aggregation result found in event: #{event.to_hash}")
Query 'starting' and 'ending'
[INFO ][logstash.inputs.elasticsearch.searchafter][one_minute_aggregation]
[cbf197f3cdb80fb9c764e3931d3efa93ad66f672a18a851431a5153ce4eef531] Query start
[INFO ][logstash.inputs.elasticsearch.searchafter][one_minute_aggregation][cbf197f3cdb80fb9c764e3931d3efa93ad66f672a18a851431a5153ce4eef531] Query completed
The event turns out to look like just a document in the index, how can I fix this query/config? It works in postman.