How to make sum(field)-sum(field2) and get a single result for all documents

I have a mongodb collection that contains two fields, amount and type, in type i have either cash-in , cash-out or transfer. I want to add an index to elasticsearch with a new field which contains (the sum of all cash-ins in each document) - (sum of all cash-outs in each document) . ps: every line pushed either is a cash-in a transaction or a cash-out Everytime I add a new document the field should aggregate itself.
is there something like this

> if type="cash-in"
> mutate {
>   add_field => {
>     "cash-in" => "%{amount}"
>   }
> } 
> if type="cash-out"
> mutate {
>   add_field => {
>     "cash-out" => "%{amount}"
>   }
> } 
> 
> mutate {
>   add_field => {
>     "total" => "sum%{cash-in}-sum%{cash-out}"
>   }
> }

It could be done with an aggregate filter

    mutate { add_field => { "[@metadata][task]" => "constant" } }
    aggregate {
        task_id => "%{[@metadata][task]}"
        code => '
            map["total"] ||=0
            t = event.get("type")
            if t == "cash-in"
                map["total"] += event.get("amount")
            elsif t == "cash-out"
                map["total"] -= event.get("amount")
            end
            event.set("total", map["total"])
        '
    }

That requires '--pipeline.workers 1' so it does not scale, and it makes assumptions about event ordering that I don't think are guaranteed by logstash (every aggregate does that).

1 Like

Thank you @Badger i have three questions though:
-can you please explain to me the first three lines.
-do map["total"] ||=0 creates thetotal field automatically and assign it ? because i don't have that field created.
-I'm a newbie on this so please bear with me, what i've understood from your last couple of lines (out of code) is that i need to set --pipeline.workers 1 along with my command to launch the logstash pipeline because of concurrency issues ? but in a long term probably data will increase and i need to use more cpu nah ?

An aggregate filter requires a task_id so that it can aggregate lines that are associated with the same event. In this case you need to aggregate every line together, so we use a constant value for the task_id. That constant value is in the [@metadata] object so that it does not get indexed.

'map["total"] ||=0' is a ruby idiom that says to sets map["total"] to zero if it does not exist. It only has any effect the first time through the filter.

You have to use '--pipeline.workers 1' when using an aggregate filter. It does not scale. It does not matter how many CPUs you have, it can only use 1.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.