How to aggregate conditionally logs

Hi I am trying to aggregate the following logs:

2023-09-06 07:36:22,573 |  INFO | Thread-934 | Config | ENTERORDER: identifier = 'Barbie', buy = false, quantity = 290000.0, price = 96.1, account = '123', reference = '', customerId = '', originalBrokerId = 'ken'
2023-09-06 07:36:22,574 |  INFO | Thread-934 | Config | processed: storeSchema = barcode, storeId = 'Barbie', currency = 'CHF', account = '123', quantity = 290000.0, allocAccounts = 
2023-09-06 07:36:22,574 |  INFO | Thread-934 | Config | setOrigin = 'New'
2023-09-06 07:36:22,626 |  INFO | Thread-934 | Config | glowServiceImpl::validateOrder::finally sw2 (49ms)
2023-09-06 07:36:22,630 |  INFO | Thread-934 | Config | OrderUtils::validateOrder::setglowChecked value = 'Y:(55ms)'

To tags should be a one line and message concatenated. If the aggregate function is removed results are successfully processed however with Aggregate I cannot make it work, I tried multiple examples from logstash page but still no success. Many thanks in advance.

Here is my code:

input {
    pipeline {
        address => "logging-glow"
    }
}

filter {

    # Extract response_glow, appl_validation_time_ms, validation_interface_type, glow_roundtrip_ms, order_details, order_origin
    if ("ms" in [message]) or ("ENTERORDER" in [message]) or ("glowServiceImpl" in [message]) or ("setOrigin" in [message]){
        grok{
            match  => {
                break_on_match => "true"
                "message" => [
                    ".*::%{GREEDYDATA:validation_interface_type}::.*'%{DATA:response_glow}:.*\(%{DATA:appl_validation_time_ms}\m",
                    "glowServiceImpl::%{GREEDYDATA:validation_interface_type}::.*\(%{DATA:glow_roundtrip_ms}\m",
                    "ENTERORDER: %{GREEDYDATA:order_details}",
                    ".*'%{GREEDYDATA:order_origin}'"
                ]
            }
        }
    }

    # Extract timestamp, severity, log_thread
    grok {
        match  => {
            break_on_match => "false"
            "message" => ["%{TIMESTAMP_ISO8601:[@metadata][timestamp]}%{SPACE}\|%{SPACE}%{LOGLEVEL:severity}%{SPACE}\|%{SPACE}%{DATA:log_thread}%{SPACE}\|"]
        }
    }

    # Set timestamp
    date {
        match => ["[@metadata][timestamp]", "ISO8601"]
        target => "@timestamp"
        tag_on_failure => ["_dateparsefailure"]
    }

    # Set component to glow
    mutate {
        add_field => { "component" => "glow" }
    }

    # Set a unique document ID for each message to avoid duplicates
    fingerprint {
        source => ["@timestamp", "message"]
        target => "[@metadata][fingerprint]"
        concatenate_sources => true
    }
# Check if thread exists in memory if not create one, otherwise update. Keep max  timestamp and aggregate tags of same thread every day
    aggregate {
        task_id => "%{thread_id}"
        timeout_task_id_field => "thread_id"
        timeout_timestamp_field => "@timestamp"
        code => "map['validation_interface_type'] = event.get('validation_interface_type')"
        push_previous_map_as_event => true
    }
}



output {
    file {
        path => "D:\APPL\logstash\Temp\logstash-logging-glow.log"
    }
}

If thread_id is not set then it always has the same value, so the aggregate will never flush the map because it never sees a different value for the thread_id.

When you use push_previous_map_as_event the resulting event will only contain whatever fields you add to the map in the aggregate code option, plus @version and @timestamp.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.