Symantec Message Gateway Multiline Logs Parsing

Hi Community,

I've multiline logs of Symantec message gateway (email server).

{"port":49542,"host":"10.10.x.x","type":"smg","message":"<158>Mar 2 16:21:59 smtp01 bmserver: 1614684119|0a0a5199-534c370000006c49-29-603e1fd7fcf8|MSG_SIZE|3682","@timestamp":"2021-03-04T07:57:47.190Z","@version":"1"}

Problem is that I couldn't combine logs that contains same value of "0a0a5199-534c370000006c49-29-603e1fd7fcf8". I'm using regex value like this one

input {
tcp {
port => 5097
type => "smg"
codec => multiline {
pattern => "\d+|(.*?)|"
what => "next"
}
}
}

What it does is like it checks if there is one or more digit and after digit, there must be pipe "|", in between two pipes "|" and "|" there is value. When you match it, then please combine all the lines that contain the same value of unique id like this one "0a0a5199-534c370000006c49-29-603e1fd7fcf8".

This identifier "0a0a5199-534c370000006c49-29-603e1fd7fcf8" is getting changed but I am unable to combine the logs of unique based on the unique identifier.

These are the logs entries where unique identifier is "0a0a5199-534c370000006c49-29-603e1fd7fcf8":

{"port":49542,"host":"10.10.x.x","type":"smg","message":"<158>Mar 2 16:21:59 smtp01 bmserver: 1614684119|0a0a5199-534c370000006c49-29-603e1fd7fcf8|MSG_SIZE|3682","@timestamp":"2021-03-04T07:57:47.190Z","@version":"1"}
{"port":49542,"host":"10.10.x.x","type":"smg","message":"<158>Mar 2 16:21:59 smtp01 bmserver: 1614684119|0a0a5199-534c370000006c49-29-603e1fd7fcf8|EHLO|hostname_here","@timestamp":"2021-03-04T07:57:47.190Z","@version":"1"}
{"port":49542,"host":"10.10.x.x","type":"smg","message":"<158>Mar 2 16:21:59 smtp01 bmserver: 1614684119|0a0a5199-534c370000006c49-29-603e1fd7fcf8|LOGICAL_IP|10.10.81.133","@timestamp":"2021-03-04T07:57:47.190Z","@version":"1"}
{"port":49542,"host":"10.10.x.x","type":"smg","message":"<158>Mar 2 16:21:59 smtp01 bmserver: 1614684119|0a0a5199-534c370000006c49-29-603e1fd7fcf8|UNTESTED|xyz@gmail.com|submission|spam|bulk|newsletter|suspicious_url|gray|safe|opl|has_urls|unscannable_pmc|content_740|content_1423808626610|content_1532019171118|content_500|content_1542184136209|content_1614087423761|content_720|content_750|content_600|content_1454394469379|content_1530882675851|content_1543489360725|content_1548324806784|content_1415274004379|content_700|content_1569318369896|content_730|content_760|content_1569928837176|content_1548326156991|content_1507892598349|content_520|content_521|content_710|sys_deny_ip|sys_allow_ip|sys_allow_email|sys_deny_email|dns_allow|dns_deny|user_allow|user_deny|freq_va|freq_dha|freq_sa|connection_class_0|connection_class_1|connection_class_2|connection_class_3|connection_class_4|connection_class_5|connection_class_6|connection_class_7|connection_class_8|connection_class_9|senderauth_batv_sign|senderauth_batv_fail|blockedlang|knownlang","@timestamp":"2021-03-04T07:57:47.190Z","@version":"1"}

And here are the logs where unique identifier is 0a0a5199-534c370000006c49-2b-603e1fdc40df

{"port":49542,"host":"10.10.x.x","type":"smg","message":"<158>Mar 2 16:22:04 smtp01 bmserver: 1614684124|0a0a5199-534c370000006c49-2b-603e1fdc40df|MSG_SIZE|4447","@timestamp":"2021-03-04T07:57:47.190Z","@version":"1"}
{"port":49542,"host":"10.10.x.x","type":"smg","message":"<158>Mar 2 16:22:04 smtp01 bmserver: 1614684124|0a0a5199-534c370000006c49-2b-603e1fdc40df|EHLO|hostname","@timestamp":"2021-03-04T07:57:47.190Z","@version":"1"}
{"port":49542,"host":"10.10.x.x","type":"smg","message":"<158>Mar 2 16:22:04 smtp01 bmserver: 1614684124|0a0a5199-534c370000006c49-2b-603e1fdc40df|LOGICAL_IP|10.10.x.x","@timestamp":"2021-03-04T07:57:47.190Z","@version":"1"}

Output Desired
There should be only two logs entries becuase unique identifiers are two. How can I achieve it? Please help ...

That will match any line that contains numbers, or any line that contains anything. That means it will match every line. | is used for alternation ("or") in a regexp. It is possible you mean "\d+\|(.*?)\|", but even then, it is not going to match the capture group. It will still match any number followed by a pipe, followed by anything, followed by a pipe.

I suggest you try an aggregate filter. Take a look at example 3.

Thank you for your prompt reply.

I've explored aggregate filter but it doesn't resolve the issue. The aggregate filter requires two fields code and task_id. The value code allows us to use map or event that performs aggregation on the fields extracted using GROK or some other filter.

Output is only one field that contains the aggregated results in numbers like 2+2-2=2

But my issue is to combine the multiline logs into single line because SMG logsource send multiline logs that are meaning less unless you don't combine multiple lines into single line. This can be achieved using unique identifier.

Here is the logstash code

input {
file {
path => "/home/zafar.iqbal/logstash/logstash-7.11.1/bin/aggregate.log"
sincedb_path => "/dev/null"
start_position => "beginning"
type => "smg"
}
}

filter {
if [type] == "smg" {
mutate { rename => {"host" => "[host][ip]"} remove_field => ["host"]}
grok {
match => {"message" => "<%{POSINT}>%{MONTH:month} %{MONTHDAY:day} %{NOTSPACE:time} %{DATA:[host][name]} %{GREEDYDATA:[event][original]}"}
}#extracted the raw payload in event.original field
grok {
match => {"[event][original]" => "%{DATA:[event][module]}: %{NUMBER}|%{DATA:unique_id}|"}
}#got unique id value in field "unique_id"
mutate {remove_field => ["[host][ip]", "month", "day", "time", "[host][name]", "[event][module]", "message"]}
aggregate {
task_id => "%{unique_id}"
code => "map['variable_name'] = 0"
#push_map_as_event_on_timeout => true
#timeout_task_id_field => "user_id"
#timeout => 30 # 1 hour timeout, user activity will be considered finished one hour after the first event, even if events keep coming
#inactivity_timeout => 30 # 5 minutes timeout, user activity will be considered finished if no new events arrive 5 minutes after the last event
#timeout_tags => ['_aggregatetimeout']
#timeout_code => "event.set('several_clicks', event.get('clicks') > 1)"
}#end_aggregate_filter
}#end_main_if_condition
}

output {
if [type] == "smg" {
stdout {codec => rubydebug}
}
}

You can add anything you want to the map, then push the contents of the map as a new event on timeout.

With those messages you could parse out items using grok

grok {
    pattern_definition => { "GUID" => "%{BASE16NUM}-%{BASE16NUM}-%{BASE16NUM}-%{BASE16NUM}" }
    break_on_match => false
    match => {
        "message" => [
            "\|%{GUID:guid}\|",
            "MSG_SIZE\|%{INT:msgsize}",
            "LOGICAL_IP\|%(IPV4:clientIp}",
            "\|%{WORD:command}|%{HOSTNAME:clientHost}$"
        ]
    }
}

then aggregate them based on the id

aggregate {
    task_id => "%{guid}"
    code => '
        map["@timestamp"] ||= event.get("@timestamp")
        map["port"] ||= event.get("port")
        map["host"] ||= event.get("host")
        map["type"] ||= event.get("type")
        map["msgsize"] ||= event.get("msgsize")
        map["clientIp"] ||= event.get("clientIp")
        map["command"] ||= event.get("command")
        map["clientHost"] ||= event.get("clientHost")
    '
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "guid"
    timeout => 10
}

Alternatively, if you want all of the messages

        map["messages"] ||= []
        map["messages"] << event.get("message")

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.