Logstash Aggregate Plugin: Unify two different task_id

Hello i'm new here,
I am looking to unify in a same line, 4 different windows events received by beats. Specifically 4 events that ADFS generates when someone authenticates.
I need to unify the Activity ID and the Instance ID from 2 different IDs.

For example:
Activity ID: 299 -> 403 -> 410
Instance ID: 299 -> 500

#Event 299:#############################################################
Instance ID: 9f1f2c05-73c0-478a-810a-08701667f497 

Activity ID: 00000000-0000-0000-3f41-00800000009c 

Relying party: urn:federation:MicrosoftOnline
########################################################################
#Event 500:#############################################################
More information for the event entry with Instance ID 9f1f2c05-73c0-478a-810a-08701667f497. There may be more events with the same Instance ID with more information. 
Instance ID:  
9f1f2c05-73c0-478a-810a-08701667f497 

Issued identity: 
http://schemas.xmlsoap.org/claims/UPN 
login@domain.com 
########################################################################
#Event 403:#############################################################
An HTTP request was received. 

Activity ID: 00000000-0000-0000-3f41-00800000009c 

Request Details: 
    Date And Time: 2019-06-13 15:40:36 
    Client IP: 1.1.1.1
########################################################################
#Event 410:#############################################################
Following request context headers present : 

Activity ID: 00000000-0000-0000-3f41-00800000009c  

X-MS-Client-Application: Microsoft.Exchange.ActiveSync 
X-MS-Client-User-Agent: LG-M700/1.81.17 
client-request-id: 00000000-0000-0000-3f41-00800000009c 
########################################################################

I am trying to do it in the following way without success:

input {
  beats {
    port => 5044
    host => "0.0.0.0"
  }
}
filter {
  grok {
    match => [
      "message", "(?<activityid>(?<=Activity ID:)(.{35,39}))"
    ]
  }
  aggregate {
      task_id => "%{activityid}"
      code => "
      map['line'] ||= [\*]
      map['line'] << {'message' => event.get('message')}
      event.cancel()
      "
      timeout_task_id_field => "activityid"
      timeout => 2
  }
  grok {
    match => [
      "line", "(?<instanceid>(?<=Instance ID:)(.{37,40}))"
    ]
  }
  aggregate {
      task_id => "%{instanceid}"
      code => "
      map['log_unify'] ||= [\*]
      map['log_unify'] << {'line' => event.get('line')}
      event.cancel()
      "
      push_map_as_event_on_timeout => true
      timeout_task_id_field => "instanceid"
      timeout => 2
  }
}
output {
    elasticsearch {
        hosts => ["http://#REMOTE_ELASTICHOST#:9200"]
        index => "test_adfs"
        sniffing => true
        manage_template => false
    }
}

The multiline plugin will not help me, since I will receive events constantly from 4 different servers.
If someone comes up with how to achieve it, I would appreciate it very much.

regards

[EDIT: added code fences (~~~) around each block to make it more readable - @yaauie]

By default, Beats will ship each line as an independent event, so by the time it gets to Logstash, we won't be able to correlate any one line with the other lines.

You will need to configure Beats to send each multiline message as a single event; documentation on how to do so can be found here.

Thanks very much for the information yaauie!

After so much time, I solved it in the following way:
input {
beats {
port => 5044
host => "0.0.0.0"
}
}
filter {
grok {
match => [
"message", "(?(?<=Activity ID:\s)(.{35,36}))"
]
break_on_match => false
}
aggregate {
task_id => "%{activityid}"
code => "
map['line'] ||=
map['line'] << {'message' => event.get('message')}
event.cancel()
"
push_map_as_event_on_timeout => true
timeout_task_id_field => "activityid"
timeout => 3
}
grok {
match => [
"line", "(?(?<=Instance ID:\s|entry with Instance ID )(.{35,36}))",
"message", "(?(?<=Instance ID:\s|entry with Instance ID )(.{35,36}))"
]
break_on_match => false
}
aggregate {
task_id => "%{instanceid}"
code => "
map['final_line'] ||=
map['final_line'] << {'line' => event.get('line')}
map['final_line'] << {'message' => event.get('message')}
event.cancel()
"
push_map_as_event_on_timeout => true
timeout_task_id_field => "instanceid"
timeout => 6
}
}
output {
elasticsearch {
hosts => ["HostElastic:9200"]
index => "test_adfs"
sniffing => true
manage_template => false
}
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.