Push_map_as_event_on_timeout undocumented side effects (Logstash version 6.8.1)

The earlier part of my config contains a lot of regex which breaks up a message from /var/log/php7.0-fpm.log into a number of parts and assigns [@metatadata][php_variety] . Additional processing is then performed based on the results. The test data I'm using for this config (with some info redacted) is

[25-Jul-2019 12:12:07] WARNING: [pool mypoolname] child 22377 said into stderr: "NOTICE: PHP message: PHP Fatal error:  Uncaught Exception: Test Exception in /var/www/..."
[25-Jul-2019 12:12:07] WARNING: [pool mypoolname] child 22377 said into stderr: "Stack trace:"
[25-Jul-2019 12:12:07] WARNING: [pool mypoolname] child 22377 said into stderr: "#0 /var/www/..."
[25-Jul-2019 12:12:07] WARNING: [pool mypoolname] child 22377 said into stderr: "#1 /var/www/..."
[25-Jul-2019 12:12:07] WARNING: [pool mypoolname] child 22377 said into stderr: "#2 /var/www/..."
[25-Jul-2019 12:12:07] WARNING: [pool mypoolname] child 22377 said into stderr: "#3 /var/www/..."
[25-Jul-2019 12:12:07] WARNING: [pool mypoolname] child 22377 said into stderr: "#4 /var/www/..."

I'm not using a multiline codec, so each of these lines is processed individually. The initial processing also assigns [@metadata][php_variety], which is later converted into an event field via

    # add variety to events out
    ruby {
        code => "event.set('variety', event.get('[@metadata][php_variety]'))"
    }

Therefore, when no aggregation is applied, these become:

{"host":"oursite.domain","path":"/var/log/php7.0-fpm.log","message":" Uncaught Exception: Test Exception in /var/www/...","php_error":"Fatal","variety":"child_msg_php_error","severity":"NOTICE","@timestamp":"2019-07-25T20:06:03.544Z","timestamp":"25-Jul-2019 13:06:02","stream":"stderr","type":"php_error","client":"mypoolname","pid":"22696","php-fpm-level":"WARNING","@version":"1"}
{"host":"oursite.domain","path":"/var/log/php7.0-fpm.log","message":"Stack trace:","variety":"child_msg","@timestamp":"2019-07-25T20:06:03.695Z","timestamp":"25-Jul-2019 13:06:02","stream":"stderr","type":"php_error","client":"mypoolname","pid":"22696","php-fpm-level":"WARNING","@version":"1"}
{"host":"oursite.domain","path":"/var/log/php7.0-fpm.log","message":"#0 /var/www/...","variety":"child_msg","@timestamp":"2019-07-25T20:06:03.695Z","timestamp":"25-Jul-2019 13:06:02","stream":"stderr","type":"php_error","client":"mypoolname","pid":"22696","php-fpm-level":"WARNING","@version":"1"}
{"host":"oursite.domain","path":"/var/log/php7.0-fpm.log","message":"#1 /var/www/...","variety":"child_msg","@timestamp":"2019-07-25T20:06:03.695Z","timestamp":"25-Jul-2019 13:06:02","stream":"stderr","type":"php_error","client":"mypoolname","pid":"22696","php-fpm-level":"WARNING","@version":"1"}
{"host":"oursite.domain","path":"/var/log/php7.0-fpm.log","message":"#2 /var/www/...","variety":"child_msg","@timestamp":"2019-07-25T20:06:03.696Z","timestamp":"25-Jul-2019 13:06:02","stream":"stderr","type":"php_error","client":"mypoolname","pid":"22696","php-fpm-level":"WARNING","@version":"1"}
{"host":"oursite.domain","path":"/var/log/php7.0-fpm.log","message":"#3 /var/www/...","variety":"child_msg","@timestamp":"2019-07-25T20:06:03.696Z","timestamp":"25-Jul-2019 13:06:02","stream":"stderr","type":"php_error","client":"mypoolname","pid":"22696","php-fpm-level":"WARNING","@version":"1"}
{"host":"oursite.domain","path":"/var/log/php7.0-fpm.log","message":"#4 /var/www/...","variety":"child_msg","@timestamp":"2019-07-25T20:06:03.696Z","timestamp":"25-Jul-2019 13:06:02","stream":"stderr","type":"php_error","client":"mypoolname","pid":"22696","php-fpm-level":"WARNING","@version":"1"}

The issue arises when attempting to concatenate these into a single message via the following rules:

# concat PHP stack traces - these only begin in events of variety child_msg_php_error
if [@metadata][php_variety] == "child_msg_php_error" {
    if [php_error] == "Fatal" {
        aggregate {
            task_id => "%{pid}"
            code => "map['php_fpm_php_stacktrace'] = event.get('message')"
            map_action => "create"
        }
    
        drop {}
    }
}

# php stack traces don't have an "end of stack trace" message, so we get to use timeouts instead
if [message] =~ "#\d .*" {
    aggregate {
        task_id => "%{pid}"
        code => "map['php_fpm_php_stacktrace'] << '\n' + event.get('message')"
        map_action => "update"
        timeout => "1" # the whole stack trace should pretty much come through at once...
        timeout_tags => ["_php_stacktrace_timeout"] # set a tag so we can indicate not to drop the event
        timeout_task_id_field => "pid"
        push_map_as_event_on_timeout => true
        #timeout_code => "event.set('message', map['php_fpm_php_stacktrace'])"
    }

    drop event unless end of stacktrace tag is set
    if "_php_stacktrace_timeout" not in [tags] {
        drop {}
    }

    otherwise, set the meta stuff back to what it was for the original message
    mutate {
        update => { "[@metadata][php_variety]" => "child_msg_php_error" }
        add_field => { "php_error" => "Fatal" }
    }
}

As per the documentation, push_map_as_event_on_timeout causes a new event by the name of php_fpm_php_stacktrace to be generated.
However, this is also causing undesireable side effects. It appears to be destroying data which is not referenced from within the aggregate block at all. variety is null in the output file, and the following message appears in logstash's std:

[WARN ] 2019-07-25 15:16:48.509 [[main]>worker0] gelf - Trouble sending GELF event {:gelf_event=>{"short_message"=>nil, "full_message"=>"%{message}", "host"=>"%{host}", "_php_error"=>"Fatal", "_php_fpm_php_stacktrace"=>" Uncaught Exception: Test Exception in /var/www/...\\n#0 /var/www/...\\n#1 /var/www/...\\n#2 /var/www/\\n#3 /var/www/\\n#4 /var/www/...", "_tags"=>"_php_stacktrace_timeout", "_pid"=>"23630", "level"=>6}, :event=>#<LogStash::Event:0x2dbe6dcc>, :error=>#<ArgumentError: short_message is missing. Options version, short_message and host must be set.>}

I am certain that this error is due to the timeout event, as commenting out the drop {}s for non-timed-out events allows every event except the final one to be sent and processed with no errors or problems whatsoever.

You are using push_map_as_event_on_timeout and cancelling every other event (btw, the normal way to do this would be to add event.cancel to the aggregate code). The only fields that will be on the event that aggregate flushes are the timeout_task_id_field and the contents of the map. If you want variety to be on the events then add it to the map.

In other words, the final event on which the error triggers is a seperate event from the event caused by the final line of the stack trace?

The input creates events for each line. The aggregate filter will create an additional event whenever a timeout occurs. The event created by aggregate does not flow through the filters that precede aggregate in the configuration, but does flow through those after it.

I see, thanks for the explanation. That would explain the effects. What I really need is to be able to push the final stacktrace event, as it contains several points of required data. Is there a builtin mechanism by which this can be done, other than assigning every piece of data I need to a map and reconstructing the event using the saved map upon timeout?

If the only way to recognize the final line of the stack trace is the fact that there are no subsequent lines for a while then I think an aggregate+timeout is the only way.

For anybody who encounters this thread in the future - Solution used: buffer initial data upon initial aggregate event, reconstruct event upon timeout event from buffered data:

            aggregate {
                task_id => "%{pid}"
                # copy all key/value pairs we want into map
                code => "map['php_fpm_php_stacktrace'] = {}; ['variety', 'client', 'severity', 'stream', '@timestamp', 'host', 'php-fpm-level', 'type', 'timestamp', 'php_error', 'message'].each { |k| map['php_fpm_php_stacktrace'][k] = event.get(k) })"
                map_action => "create"
            }

...

            aggregate {
                task_id => "%{pid}"
                code => "map['php_fpm_php_stacktrace']['message'] << '\n' + event.get('message')"
                map_action => "update"
                timeout => "1" # the whole stack trace should pretty much come through at once...
                timeout_task_id_field => "pid"
                push_map_as_event_on_timeout => true
                timeout_code => "event.get('php_fpm_php_stacktrace').each { |k, v| event.set(k, v); }; event.remove('php_fpm_php_stacktrace')"
            }