Add two value from two record

I am gone a receive two line of record every few min.
example

name : total
code1 : 100
code2: 101

I would like to create a record

{
    name: code1
    total: 100
   combine_total: 201
}
{
   name:code2
   total: 101
   combine_total:201
}

how do I achieve this in logstash. is it even possible?
combine_total = both total +

Pretty much anything is possible in logstash. You could do this in an aggregate filter. There are a couple of approaches, but in both you will have to save the name and code1 values in the map, then event.cancel those events. Once you receive the code2 value, create an array that contains both records you want in elasticsearch and then use a split filter to convert it to two events. You may then need to move fields to the top level.

Note, you will need a task_id on the events to combine them. That can be a constant.

mutate { add_field => { "myTaskId" => "1" } }

If it is literally always name/code1/code2 you could do something like example 1 in the documentation. map_action would be create for 'name', update for 'code1' and 'code2', and end_of_task would be set for 'code2'.

Alternatively, use something like example 3, where you set push_map_as_event_on_timeout to true.

Badgere,
yes it is always code1/code2 let me test out this.
first I have to understand what it does.
Thanks for pointer.

I have use this but seems failing must be making small mistake

input as I said, only two line key:value pair

name: total
code1 : 100
code2: 101

   mutate { add_field => { "myTaskId" => "1" } }
   aggregate {
         task_id => "%{myTaskId}"
         code => "map['mytotal'] += event.get('total')"
         timeout_task_id_field => "name"   
   }

That will never flush the map. Try adding

    push_map_as_event_on_timeout => true
    timeout => 10

gives me this before printing out data

aggregate - Aggregate exception occurred {:error=>#<NoMethodError: undefined method `+' for nil:NilClass>, :code=>"map['mytotal'] += event.get('total')", :map=>{},

code => "
     map['mytotal'] ||= 0
     map['mytotal'] += event.get('total')
"

that remove the warning but didn't pushed out mytotal in event out

this is what I have now
aggregate {
task_id => "%{mytaskid}"
code => "map['mytotal'] ||= 0 ; map['mytotal'] += event.get('total')"
push_map_as_event_on_timeout => true
timeout => 10
}

Does it help if you disable java_execution?

how do I set/unset that java-execution ?

used this but no output for that mytotal.

/usr/share/logstash/bin/logstash --java-execution=false -f myconf.conf

I ran this in debug mode and got following.

This tells me it is executing code.

[DEBUG] 2020-07-30 12:00:48.695 [[main]>worker0] aggregate - Aggregate successful filter code execution {:code=>"map['mytotal'] ||= 0 ; map['mytotal'] += event.get('totalspace_pb')"}
[DEBUG] 2020-07-30 12:00:48.697 [[main]>worker0] aggregate - Aggregate successful filter code execution {:code=>"map['mytotal'] ||= 0 ; map['mytotal'] += event.get('totalspace_pb')"}

and after my two record printed it exit out

[DEBUG] 2020-07-30 12:04:21.335 [[main]>worker0] pipeline - Flushing {:plugin=>#LogStash::FilterDelegator:0x2e180b41}
[DEBUG] 2020-07-30 12:04:21.337 [[main]>worker0] aggregate - Aggregate remove_expired_maps call with '%{mytaskid}' pattern and 1 maps
[DEBUG] 2020-07-30 12:04:21.340 [[main]-pipeline-manager] mutate - Closing {:plugin=>"LogStash::Filters::Mutate"}
[DEBUG] 2020-07-30 12:04:21.341 [[main]-pipeline-manager] pluginmetadata - Removing metadata for plugin ed4c3a623f55a241f049ace08f1171fafb3e03c4d2fa2fc078ad2ebb60f0dfea
[DEBUG] 2020-07-30 12:04:21.341 [[main]-pipeline-manager] mutate - Closing {:plugin=>"LogStash::Filters::Mutate"}
[DEBUG] 2020-07-30 12:04:21.342 [[main]-pipeline-manager] pluginmetadata - Removing metadata for plugin caa322ce1ac78526fefa1ce5e55e3355811340883f2638227b51324d2ed9f62b
[DEBUG] 2020-07-30 12:04:21.342 [[main]-pipeline-manager] mutate - Closing {:plugin=>"LogStash::Filters::Mutate"}
[DEBUG] 2020-07-30 12:04:21.342 [[main]-pipeline-manager] pluginmetadata - Removing metadata for plugin 6d095b23f3556247184221e34b0076de2235a71b8917af6a9590fbdcee7d66df
[DEBUG] 2020-07-30 12:04:21.342 [[main]-pipeline-manager] ruby - Closing {:plugin=>"LogStash::Filters::Ruby"}
[DEBUG] 2020-07-30 12:04:21.342 [[main]-pipeline-manager] pluginmetadata - Removing metadata for plugin 08d14c095a0a65009727b3a286d6d956a3d906a4b1e74ede20ac36b719eb15dd
[DEBUG] 2020-07-30 12:04:21.343 [[main]-pipeline-manager] ruby - Closing {:plugin=>"LogStash::Filters::Ruby"}
[DEBUG] 2020-07-30 12:04:21.343 [[main]-pipeline-manager] pluginmetadata - Removing metadata for plugin 302b4b0d3e7749db78f0e4cc0b25bba2d1222552e1f4b50528025018b5274a7f
[DEBUG] 2020-07-30 12:04:21.343 [[main]-pipeline-manager] mutate - Closing {:plugin=>"LogStash::Filters::Mutate"}
[DEBUG] 2020-07-30 12:04:21.343 [[main]-pipeline-manager] pluginmetadata - Removing metadata for plugin 0d1ee2ee8c4f830dc71c8a82c46da241d067afb8af3b5c5a225bb98aeb5cc8f1
[DEBUG] 2020-07-30 12:04:21.344 [[main]-pipeline-manager] aggregate - Closing {:plugin=>"LogStash::Filters::Aggregate"}
[DEBUG] 2020-07-30 12:04:21.344 [[main]-pipeline-manager] aggregate - Aggregate close call {:code=>"map['mytotal'] ||= 0 ; map['mytotal'] += event.get('totalspace_pb')"}
[DEBUG] 2020-07-30 12:04:21.345 [[main]-pipeline-manager] pluginmetadata - Removing metadata for plugin 3e191300378cfd3de01e5c057645e800c3a202cbd64ca7a836a8e4fea15cd2be
[DEBUG] 2020-07-30 12:04:21.345 [[main]-pipeline-manager] stdout - Closing {:plugin=>"LogStash::Outputs::Stdout"}
[DEBUG] 2020-07-30 12:04:21.345 [[main]-pipeline-manager] pluginmetadata - Removing metadata for plugin 709d54121e8fc68bc368abcc2dfa1f14aab943d26dc6ff1c45174d139b46abf5
[INFO ] 2020-07-30 12:04:21.346 [[main]-pipeline-manager] pipeline - Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x62041bb2@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:197 run>"}
[DEBUG] 2020-07-30 12:04:21.477 [LogStash::Runner] os - Stopping
[DEBUG] 2020-07-30 12:04:21.478 [LogStash::Runner] jvm - Stopping
[DEBUG] 2020-07-30 12:04:21.479 [LogStash::Runner] persistentqueue - Stopping
[DEBUG] 2020-07-30 12:04:21.480 [LogStash::Runner] deadletterqueue - Stopping
[DEBUG] 2020-07-30 12:04:21.481 [LogStash::Runner] agent - Shutting down all pipelines {:pipelines_count=>0}
[DEBUG] 2020-07-30 12:04:21.482 [LogStash::Runner] agent - Converging pipelines state {:actions_count=>0}
[INFO ] 2020-07-30 12:04:21.493 [LogStash::Runner] runner - Logstash shut down.

I would expect it to be

/usr/share/logstash/bin/logstash --java-execution false -f myconf.conf

with no =

same result with = or no =

Badger,
Try all kind of different combination but nothing has workout.

flush is getting called, so it is not a problem with java_execution. However it never gets to

@logger.debug("Aggregate remove expired map with task_id=#{key}")

which suggests there are no map entries. You have made many changes to the filter configuration. What does it currently look like?

did so many different combination. Here is current setup and input and output
input is only two line data

subpool    totalspace       totalfreespace
usi10          3.55                  1.29
usi20          3.3                    1.57


   mutate { add_field => { "mytaskid" => "1" } }
   aggregate {
         task_id => "%{mytaskid}"
         code => "map['mytotal'] ||= 0 ; map['mytotal'] += event.get('totalspace')"
         push_map_as_event_on_timeout => true
         timeout_task_id_field => "%{subpool}"
         timeout => 1
         timeout_tags => ['_aggregatetimeout']
         add_field => { "did_it_worked" => "then set this %{mytotal}" }
   }



{
             "@version" => "1",
        "totalspace" => 3.55,
           "@timestamp" => 2020-07-31T17:10:17.989Z,
          "totalfreespace" => 1.29,
       "did_it_worked" => "then set this %{mytotal}",
              "subpool" => "usi10",
             "mytaskid" => "1"
}
{
             "@version" => "1",
        "totalspace" => 3.3,
           "@timestamp" => 2020-07-31T17:10:17.990Z,
           totalfreespace" => 1.57,
        "did_it_worked" => "then set this %{mytotal}",
              "subpool" => "usi20",
             "mytaskid" => "1"
}

I wonder if using such a small timeout is causing an issue. Can you increase the timeout to 10?

timeout 10 didn't work

is it because input is coming from jdbc? it seems to be like example 4 in this link

https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html