Same uuid or other id type when field content is equal

Hi,

I'm beginner using Logstash and I have two messages on my Firewall logs that are important to me, follows below:

Mar 20 2020 15:38:00 1.1.1.1 : %ASA-6-113039: Group <Group_name> User IP <xxx.xxx.xxx.xxx> AnyConnect parent session started. -----> CONNECTION message

Mar 20 2020 15:38:28 1.1.1.1 : %ASA-4-113019: Group = Group_name, Username = username, IP = xxx.xxx.xxx.xxx, Session disconnected. Session Type: SSL, Duration: 0h:00m:28s, Bytes xmt: 17453, Bytes rcv: 2206, Reason: User Requested. -----> DISCONNECTION message

I'm building a database for this events, so I need to generate a unique session_id for both messages and I don't know how to do it neither if it is possible...

For example, follows my LS filter:

filter {

#If ciscotag is ASA-6-113039 for "started"

if "ASA-6-113039" in [message] {

grok {
  match => { "message" => "%{CISCOTIMESTAMP:timestamp} %{SYSLOGHOST:sysloghost} : %%{CISCOTAG:ciscotag}: Group <%{DATA:policy_id}> User <%{USERNAME:user_id1}> IP <%{IP:public_ip}> %{DATA:client_type} parent %{GREEDYDATA:action}.." }
}

mutate {
  add_field => { "act_type" => "in" }
  rename => { "user_id1" => "user_id" }
 }

uuid {
  #add_field => {"testId" => "@uuid"}
  target => "testId"
 }
}

#If ciscotag is ASA-4-113019 for "disconnected"

else if "ASA-4-113019" in [message] {

grok {
  match => { "message" => "%{CISCOTIMESTAMP:timestamp} %{SYSLOGHOST:sysloghost} : %%{CISCOTAG:ciscotag}: Group = %{DATA:policy_id}, Username = %{USERNAME:user_id2}, IP = %{IP:public_ip}, %{DATA:action}. Session Type: SSL, Duration: %{DATA:duration}, %{GREEDYDATA:message}" }
 }

mutate {
  add_field => { "act_type" => "out" }
  rename => { "user_id2" => "user_id" }
 }

if [user_id1] == [user_id2] {
 mutate { 
  add_tag => {"dest_field" => "teste"}
  copy => { "testId" => "dest_field" }
  }
 }
}

 else { drop{ } }

}

I'm receiving these messages on my outfile:

{"ciscotag":"ASA-6-113039","host":"1.1.1.1","message":"<174>Mar 25 2020 19:37:14 1.1.1.1 : %ASA-6-113039: Group <GroupPolicy> User <Username> IP <xxx.xxx.xxx.xxx> AnyConnect parent session started.\n","@version":"1","@timestamp":"2020-03-25T22:37:14.857Z","sysloghost":"1.1.1.1","timestamp":"Mar 25 2020 19:37:14","action":"session started","policy_id":"GroupPolicy","testId":"fa1e1bfe-a0a4-4512-80a1-eb9fe6d802da","act_type":"in","user_id":"Username","client_type":"AnyConnect","public_ip":"xxx.xxx.xxx.xxx"}

----> IN message

{"ciscotag":"ASA-4-113019","host":"1.1.1.1","message":["<172>Mar 25 2020 19:43:05 1.1.1.1 : %ASA-4-113019: Group = GroupPolicy, Username = Username, IP = xxx.xxx.xxx.xxx, Session disconnected. Session Type: SSL, Duration: 0h:05m:53s, Bytes xmt: 200491, Bytes rcv: 126724, Reason: User Requested\n","Bytes xmt: 200491, Bytes rcv: 126724, Reason: User Requested\n"],"@version":"1","@timestamp":"2020-03-25T22:43:05.565Z","sysloghost":"1.1.1.1","timestamp":"Mar 25 2020 19:43:05","action":"Session disconnected","policy_id":"GroupPolicy","act_type":"out","tags":["dest_field"],"user_id":"Username","duration":"0h:05m:53s","public_ip":"xxx.xxx.xxx.xxx"}

----> OUT message

What do I need? I need the same TestId ("testId":"fa1e1bfe-a0a4-4512-80a1-eb9fe6d802da") in both messages when the "user_id" content are equal.

Please, someone help me.

You may be able to use a fingerprint filter to generate a hash of the username.

Thank you Badger, but how do I will create it and attach the same in a unique IN and OUT event. For example:

User1 [fingerprint123] session started <--- IN

User2 [fingerprint456] session started <--- IN

User1 [fingerprint123] session disconnected <--- OUT

User3 [fingerprint789] session started <--- IN

User2 [fingerprint456] session disconnected <--- OUT

User1 [fingerprint321] session started <--- IN

User2 [fingerprint987] session started <--- IN

Are you understanding? I need a new ID to each pair for started and disconnected...

You need the order of events preserved and logstash does not do that by default. You could disable java_execution and and set pipeline.workers to 1 to preserve the order, then use an aggregate filter. Look at example 1 in the documentation.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.