How to share field data between documents of the same file path

I am using logstash to parse test logs. Some test log files have multiple errors per file. Only the last error in each file contains the "scenario" information but I need all errors from the same file path to share the same "scenario" field data. I believe I can accomplish this using the mutate and aggregate filters but I am not sure how to implement this solution. My intuition is to use the mutate filter to add the scenario field if it doesn't already exist like this:

if ![scenario] {
mutate { 
    add_field => {"scenario" => ""}
       }
}

and then use aggregate filter plugin to share the field data between all entries with the same file path like this:

aggregate {
    task_id => "%{[log][file][path]}" 
    code => "map['sharedscenario'] ||= "";
             map['sharedscenario'] += event.get('scenario');"
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "[log][file][path]"
    timeout => 300
    timeout_tags => ['_aggregatetimeout']
}

So far this has not worked for me. If there is a better way to accomplish this or some obvious errors in my implementation please let me know. Thanks!

You say you want all errors to have the scenario field. I can think of two approaches to that.

The first is to index all the data, then go back and add the scenario to any documents that do not have it. You could do that using logstash. Configure the index with a boolean field called something like scenarioAdded that defaults to false. Run logstash with an elasticsearch input that fetches all records that have a [scenario] field and [scenarioAdded] set to false, then feed those to an http output that makes an update-by-query call to elasticsearch to add [scenario] and set scenarioAdded to true for all documents with the same [log][file][path]. There is an update-by-query example here.

The second is to use aggregate. You say the last error has the scenario, so in the aggregate filter you would need to stash all the errors in the map until you see the last one, then you can delete the map entry. The configuration below is not tested, it is just meant to give you a general idea of what I mean

grok { match => { "message" => "Scenario: %{WORD:scenario}" } }
if ! [scenario] {
    aggregate {
        task_id => "%{[log][file][path]}" 
        code => '
            # Assumes you only want to save the message field
            # If necessary you could go all the way to
            # map["errors"] << event.to_hash
            map["errors"] ||= []
            map["errors"] << event.get("message")
            event.cancel
    }
} else {
    aggregate {
        task_id => "%{[log][file][path]}" 
        code => '
            map["errors"] ||= []
            map["errors"] << event.get("message")
            event.set("errors", map["errors"])
        end_of_task => true
        timeout => 300
    }
    # This event has a [scenario] field, so all those
    #  created by split will have one too
    split { field => errors }
}
1 Like

Awesome, thanks! I used your aggregation implementation and it worked like a charm.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.