Logstash - aggregate results

Hello,

When I am using the aggregate filter in logstash (to get the total sales of each product, for example), is there a way to send the aggregated results to a different output and not line by line?

Thank you!

Yes, you can drop the lines that are not aggregated and just keep the aggregations. See here for an example. If you need more details then you need to show us your input data and what your current aggregate filter configuration looks like.

1 Like

Thank you!! We tried to implement the suggestion you gave. However, we have a large quantity of data and we want to read all the data and, in the end, show the aggregation results. We tried to maximize the timeout, but we never know when it will have the final results.

Here it is the code, if you can help us, we appreciate it. We just want to output the aggregation results.

input{
elasticsearch{
"hosts" => "XXX"
"index" => "logs_0"
schedule => "* * * * *"
}
}
filter {
aggregate {
task_id => "%{subProcessName}"
code => "map['total'] ||= 0; map['total'] += event.get('executionTime');"
push_map_as_event_on_timeout => true
timeout_task_id_field => "subProcessName"
timeout => 4
timeout_tags => ['_aggregatetimeout']
timeout_code => "event.set('[@metadata][wanted]', 1)"
}
if [@metadata][wanted] != 1 { drop {} }
}
output{
if "_aggregatetimeout" in [tags] {
elasticsearch{
"hosts" => "XXX"
"index" => "logs_4"
}
}
}

If I am reading it correctly, that elasticsearch input re-reads the entire index once a minute. Is that right? The aggregate looks right, although a 4 second timeout is not very long. What problem are you having?

1 Like

Yes, we notice that schedule does not make sense here, we deleted.
The problem is that logstash takes some time to get the results from elasticsearch, and if that timeout defined is higher that the time that logstash takes to read elasticsearch, the logstash will stop before showing any aggregation results. If I considered a timeout like 10 seconds, the aggregation results will be repeated each 10 seconds. I want to have like:
process1=> 10,
process2=> 20
and I am getting
process1=>5,
process1=>5,
product2=>10,
product2=>10
(sometimes it loses itself in the sum and we lose data)

Any idea?

lower, not higher. Why not use an 1800 second timeout?

1 Like

But if logstash stops before that 1800 seconds, the results will not appear as desired because logstash execution stopped as soon as read everything from elasticsearch. Right?

Can you use push_previous_map_as_event? That will cause logstash to flush the map when it exits.

1 Like

We found the solution ordering the data from elasticsearch, because every time a new process came, the results were presented and the aggregations restarted. Thanks!

Is it possible to aggregate results (like process and total value) and then for each process do a lookup to elasticsearch and an aggregate to get as final result a list of: process, processID, total value?
I tried to do another aggregation like this by processID, but in the final result it only appears the first aggregation and an empty process.
I want to replicate each line of the first aggregation by the processID.

I cannot really answer that without understanding more about the data layout.

Note that you can stash additional fields in the map, even if they are constant for a given task id, and they will get included in the aggregated event.

In my datasource I have like:
process_name ; process_id ; value
process1; 1; 10
process2; 2; 20
process1; 3; 5
process2; 4; 10

And I need the avg by process_name and the value by process_id to compare, like:
process_name; process_id; value; avg
process1; 1; 10; 7.5
process1; 3; 5; 7.5
process2; 2; 20; 15
process2; 4; 10; 15

So, we want to add the field avg (by process_name), which we calculate in the aggregation, in each line. And, this avg value has to be replicated by each process_id with the same process_name.
We tried to create two aggregations, with elasticsearch lookup, but no success yet.
Can you help us?

I see you asked a question about getting elasticsearch to do the aggregation. That is fundamentally a better approach that this. That said, if I have a csv like this

process_name,process_id,value
process1,1,10
process2,2,20
process1,3,5
process2,4,10

I can run a filter like this

csv { autodetect_column_names => true }
aggregate {
    task_id => "%{process_name}"
    code => "
        map['lines'] ||= 0; map['lines'] += 1;
        map['total'] ||= 0; map['total'] += event.get('value').to_i;
    "
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "processName"
    timeout => 6 # seconds
    timeout_code => "event.set('average', event.get('total').to_f/event.get('lines').to_f)"
}

That will output the 4 lines from the csv, and then several seconds later output 2 more events having average set to 7.5 and 15.0.

The only reason I used a different value for timeout_task_id_field is to make it clear what that option does.

Thanks! However, we want to keep process_id in the output, and that's why it doesn't work. How we can add the process_id and maintain the average by process_name?
We want to have this output:
process_name; process_id; value; avg
process1; 1; 10; 7.5
process1; 3; 5; 7.5
process2; 2; 20; 15
process2; 4; 10; 15

Well, it could be done. You could ingest each row into an array in map, and then at the end iterate over the array and spit out each row with the average appended. But that's a terrible design (ingesting the entire data set into memory), and I'm not going to encourage you to do it by writing the code for you :smiley: Doing the aggregation in elasticsearch is a far better idea.

1 Like

Thanks! I also tried to aggregate in elasticsearch but I am facing other issue. Can you please see this topic ? Logstash - aggregation_fields in elasticsearch filter plugin
Maybe I am missing something.

Yeah, I looked at that.The configuration looks correct, given the es output you show. I don't have an elasticsearch server I can experiment against.

Thanks a lot!! :smiley: We will keep trying!

We found the solution, we were using query instead of query_template in the elasticsearch filter plugin.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.