Combine Information from different log lines into one event

Hello team,
I have case where information is being displayed in different lines. These lines are not even consecutive lines. Unique field is order ID. I have to triggered email if orderID with Exit came after 30 min of OrderID with Entry.

2021-11-07 07:11:02.002015|OrderId=100 Entry
2021-11-07 07:11:02.002016|blah1
2021-11-07 07:11:02.002017|blah2
2021-11-07 07:11:02.002018|blah3
2021-11-07 08:11:02.002019|OrderId=100 Exit Symbol=APPLE Price=99 Quantity=100

In above log difference between first log (Entry) and last log (exit) is more than 30 min. So we need to trigger mail.

Can you please help me on this.

Use aggregate filters. A combination of example 3, to get the timeout, and example 2 to disable the timeout if the second line arrives in time.

    grok { match => { "message" => "%{TIMESTAMP_ISO8601:[@metadata][ts]}\|OrderId=%{INT:orderId} %{WORD:inOrOut}" } }
    date { match => [ "[@metadata][ts]", "YY-MM-dd HH:mm:ss.SSSSSS" ] }

    aggregate {
        task_id => "%{orderId}"
        code => ''
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "orderId"
        timeout => 1800
        timeout_tags => ['_aggregatetimeout']
    }

    if [inOrOut] == "Exit" {
        aggregate {
            task_id => "%{orderId}"
            end_of_task => true
            code => ''
        }
    }

Then route the timeout event to an email output...

output {
    if "_aggregatetimeout" in [tags] {
        email { ...

You could make use of scripted upserts to do calculate the time difference, as described in: Using Logstash and Elasticsearch to calculate transaction duration in a microservices architecture

You could then write an alert to scan for recently completed events where the duration is greater than 30

Hello @Badger ,
I tried following logstash code, but no luck. Duration field not created in entry or exit log

Sample Log line:

{"timestamp":"2021-11-17T05:52:59:613","level":"info","message":"ENTRY: INTERNAL_HTTP_REQUEST","userId":"rahul","companyId":"abc"}

{"timestamp":"2021-11-17T05:52:59:768","level":"info","message":"EXIT: INTERNAL_HTTP_REQUEST","userId":"rahul","companyId":"abc"}

if [log_processed.message] == "ENTRY: INTERNAL_HTTP_REQUEST" {
        aggregate {
            task_id => "%{log_processed.companyId}"
            code => "map['started'] = event['@timestamp']"
            map_action => "create"
        }
    }

    if [log_processed.message] == "EXIT: INTERNAL_HTTP_REQUEST" {
        aggregate {
            task_id => "%{log_processed.companyId}"
            code => "event['duration'] = event['@timestamp'] - map['started']"
            map_action => "update"
            push_map_as_event_on_timeout => true
            timeout_task_id_field => "log_processed.companyId"
            timeout => 60 # 1 minutes timeout
			timeout_tags => ['_aggregatetimeout']
        }
    }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.