Can't parse the log

Greetings, guys!

I must say that I am a network engineer, who works with network and server environment. Sadly I am not a programmer, more to say I am really dumb at programming logic.

I have a question on to how to parse the log file correctly. The log file I got looks like this:
Apr 11 11:45:01 firepower SFIMS: Protocol: TCP, SrcIP: 172.31.7.74, OriginalClientIP: ::, DstIP: 172.31.11.2, SrcPort: 53064, DstPort: 443, TCPFlags: 0x0, IngressInterface: eth1, DE: Primary Detection Engine (3b4821d2-0dfb-11e7-ac66-d3a8ab9358ce), Policy: Policy-Intact, ConnectType: Start, AccessControlRuleName: Unknown, AccessControlRuleAction: Allow, Prefilter Policy: Unknown, UserName: No Authentication Required, InitiatorPackets: 2, ResponderPackets: 1, InitiatorBytes: 126, ResponderBytes: 66, NAPPolicy: Balanced Security and Connectivity, DNSResponseType: No Error, Sinkhole: Unknown, URLCategory: Unknown, URLReputation: Risk unknown

Im looking for filter and code, that will help me to bring this mess in to what I think should look like this:

Date: Apr 11 11:45:01
Protocol: TCP // But it can be UDP, I solved this via grok pattern PROTO \b(?:TCP?|UDP?)\b . Maybe there is less complex way?
SrcIP: 172.31.7.74
DstIP: 172.31.11.2
SrcPort: 53064
DstPort: 443
URL: http://example.local // This string doesnt exist in original log file, but it might appear, and I want filter to search for this string and show it in case it exists.

You can see that some values in log are empty(f.e. OriginalClientIP). I need to know how to write my filter so pattern will work if OriginalClientIP will have value and will not.

P.S. Sorry for bad english. Sorry for creating topic, I just lost my hope trying to understand everything and at the same time...

__

UPD:
Looks like kv is what I need.

The problem now is that values are sometimes has extra "," symbol, which is not needed.

Also I want to delete all output except Date, Protocol, SrcIP, DstIP, SrcPort, DstPort and URL.

My pipe.conf:

input {
	beats {
		port => "5044"
	}
}
filter {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:Date} %{WORD} %{WORD}\: %{GREEDYDATA:msg}" }
      }
    kv {
      source => "msg"
      value_split => ":"
      target => "kv"
    }
}
output {
	stdout { codec => rubydebug }
}

And output is as follows:

 "offset" => 9532429,
    "input_type" => "log",
        "source" => "/var/log/172.31.11.21/syslog.log",
            "kv" => {
                         "Policy" => [
            [0] "Policy-Intact,",
            [1] "Unknown,"
        ],
                             "DE" => "Primary",
                      "NAPPolicy" => "Balanced",
        "AccessControlRuleAction" => "Allow,",
               "IngressInterface" => "eth1,",
                 "InitiatorBytes" => "2372,",
                       "Sinkhole" => "Unknown,",
               "InitiatorPackets" => "8,",
                    "URLCategory" => "Uncategorized,",
                            "URL" => "https://firepower",
                    "ConnectType" => "End,",
                       "Protocol" => "TCP,",
                         "Client" => "SSL",
            "ApplicationProtocol" => "HTTPS,",
                        "SrcPort" => "52929,",
                       "UserName" => "No",
                  "URLReputation" => "Risk",
                        "DstPort" => "443,",
                 "ResponderBytes" => "2904,",
                          "SrcIP" => "172.31.4.50,",
                "DNSResponseType" => "No",
                       "TCPFlags" => "0x0,",
               "ResponderPackets" => "9,",
                          "DstIP" => "172.31.11.20,",
          "AccessControlRuleName" => "test,",
               "OriginalClientIP" => "::,"
    },
       "message" => "Apr 12 12:30:54 firepower SFIMS: Protocol: TCP, SrcIP: 172.31.4.50, OriginalClientIP: ::, DstIP: 172.31.11.20, SrcPort: 52929, DstPort: 443, TCPFlags: 0x0, IngressInterface: eth1, DE: Primary Detection Engine (3b4821d2-0dfb-11e7-ac66-d3a8ab9358ce), Policy: Policy-Intact, ConnectType: End, AccessControlRuleName: test, AccessControlRuleAction: Allow, Prefilter Policy: Unknown, UserName: No Authentication Required, Client: SSL client, ApplicationProtocol: HTTPS, InitiatorPackets: 8, ResponderPackets: 9, InitiatorBytes: 2372, ResponderBytes: 2904, NAPPolicy: Balanced Security and Connectivity, DNSResponseType: No Error, Sinkhole: Unknown, URLCategory: Uncategorized, URLReputation: Risk unknown, URL: https://firepower",
          "type" => "log",
          "Date" => "Apr 12 12:30:54",
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
    "@timestamp" => 2017-04-12T12:30:50.066Z,
      "@version" => "1",
          "beat" => {
        "hostname" => "elastic",
            "name" => "elastic",
         "version" => "5.3.0"
    },
          "host" => "elastic"
}
{
           "msg" => "Protocol: UDP, SrcIP: 172.31.11.21, OriginalClientIP: ::, DstIP: 46.46.160.130, SrcPort: 123, DstPort: 123, TCPFlags: 0x0, IngressInterface: eth1, DE: Primary Detection Engine (3b4821d2-0dfb-11e7-ac66-d3a8ab9358ce), Policy: Policy-Intact, ConnectType: Start, AccessControlRuleName: Unknown, AccessControlRuleAction: Allow, Prefilter Policy: Unknown, UserName: No Authentication Required, Client: NTP client, ApplicationProtocol: NTP, InitiatorPackets: 1, ResponderPackets: 0, InitiatorBytes: 90, ResponderBytes: 0, NAPPolicy: Balanced Security and Connectivity, DNSResponseType: No Error, Sinkhole: Unknown, URLCategory: Unknown, URLReputation: Risk unknown",

You can use the grok debugger to sort this out

http://grokdebug.herokuapp.com/

You can use this format as a starter

%{SYSLOGTIMESTAMP:timestamp} %{WORD:NA01} %{WORD:NA02}: Protocol: %{WORD:protocol}, SrcIP: %{IP:srcip}

Greetings, mkorayem!

Currently I am wokring with this tool, but I simply don't understand how to deal with dynamic changes in log (as I said OriginalClientIP can have value sometimes, and sometimes not). Also I dont understand how to look for URL, that might be presented in log from time to time.

Hi Anton,

Actually I am a newbie to ELK I am trying to make it our central logging aggregation system for all windows and linux logs, systems and services.

If there is a field that may or may not be there you can use this pattern (?:%{IP:origianlclientip})?

and here you can find a lot of patterns you can use out of the box

Most of the message can be treated as a key/value list that you can use a kv filter to parse. Use a grok filter to separate the timestamp and whatever other prefixes you have from the key/value list, then run a kv filter on the remainder.

1 Like

Regarding the originalclient IP, you can use the grok pattern something like this:

Your log pattern:

OriginalClientIP: ::,

I guess when it has IP, it will look like this?

OriginalClientIP: 1.1.1.1,

So the GROK pattern in this case would be,

((?:\:\:,?)|(?:%{IP:OrigCliIP})?),

It works in grok debugger online tool.

Dear Magnus, thank you very much for advice!
Looks like kv is what I need.

The problem now is that values are sometimes has extra "," symbol, which is not needed.

Also I want to delete all output except Date, Protocol, SrcIP, DstIP, SrcPort, DstPort and URL.

My pipe.conf:

input {
	beats {
		port => "5044"
	}
}
filter {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:Date} %{WORD} %{WORD}\: %{GREEDYDATA:msg}" }
      }
    kv {
      source => "msg"
      value_split => ":"
      target => "kv"
    }
}
output {
	stdout { codec => rubydebug }
}

And output is as follows:

 "offset" => 9532429,
    "input_type" => "log",
        "source" => "/var/log/172.31.11.21/syslog.log",
            "kv" => {
                         "Policy" => [
            [0] "Policy-Intact,",
            [1] "Unknown,"
        ],
                             "DE" => "Primary",
                      "NAPPolicy" => "Balanced",
        "AccessControlRuleAction" => "Allow,",
               "IngressInterface" => "eth1,",
                 "InitiatorBytes" => "2372,",
                       "Sinkhole" => "Unknown,",
               "InitiatorPackets" => "8,",
                    "URLCategory" => "Uncategorized,",
                            "URL" => "https://firepower",
                    "ConnectType" => "End,",
                       "Protocol" => "TCP,",
                         "Client" => "SSL",
            "ApplicationProtocol" => "HTTPS,",
                        "SrcPort" => "52929,",
                       "UserName" => "No",
                  "URLReputation" => "Risk",
                        "DstPort" => "443,",
                 "ResponderBytes" => "2904,",
                          "SrcIP" => "172.31.4.50,",
                "DNSResponseType" => "No",
                       "TCPFlags" => "0x0,",
               "ResponderPackets" => "9,",
                          "DstIP" => "172.31.11.20,",
          "AccessControlRuleName" => "test,",
               "OriginalClientIP" => "::,"
    },
       "message" => "Apr 12 12:30:54 firepower SFIMS: Protocol: TCP, SrcIP: 172.31.4.50, OriginalClientIP: ::, DstIP: 172.31.11.20, SrcPort: 52929, DstPort: 443, TCPFlags: 0x0, IngressInterface: eth1, DE: Primary Detection Engine (3b4821d2-0dfb-11e7-ac66-d3a8ab9358ce), Policy: Policy-Intact, ConnectType: End, AccessControlRuleName: test, AccessControlRuleAction: Allow, Prefilter Policy: Unknown, UserName: No Authentication Required, Client: SSL client, ApplicationProtocol: HTTPS, InitiatorPackets: 8, ResponderPackets: 9, InitiatorBytes: 2372, ResponderBytes: 2904, NAPPolicy: Balanced Security and Connectivity, DNSResponseType: No Error, Sinkhole: Unknown, URLCategory: Uncategorized, URLReputation: Risk unknown, URL: https://firepower",
          "type" => "log",
          "Date" => "Apr 12 12:30:54",
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
    "@timestamp" => 2017-04-12T12:30:50.066Z,
      "@version" => "1",
          "beat" => {
        "hostname" => "elastic",
            "name" => "elastic",
         "version" => "5.3.0"
    },
          "host" => "elastic"
}
{
           "msg" => "Protocol: UDP, SrcIP: 172.31.11.21, OriginalClientIP: ::, DstIP: 46.46.160.130, SrcPort: 123, DstPort: 123, TCPFlags: 0x0, IngressInterface: eth1, DE: Primary Detection Engine (3b4821d2-0dfb-11e7-ac66-d3a8ab9358ce), Policy: Policy-Intact, ConnectType: Start, AccessControlRuleName: Unknown, AccessControlRuleAction: Allow, Prefilter Policy: Unknown, UserName: No Authentication Required, Client: NTP client, ApplicationProtocol: NTP, InitiatorPackets: 1, ResponderPackets: 0, InitiatorBytes: 90, ResponderBytes: 0, NAPPolicy: Balanced Security and Connectivity, DNSResponseType: No Error, Sinkhole: Unknown, URLCategory: Unknown, URLReputation: Risk unknown",

Helo, raamee!

I have found that kv filter is working almost as I expected. The problem now is that I need to delete all output except some of the values that I need. Also I have found, that my outut is not perfect and some values has ",".

              "NAPPolicy" => "Balanced",
"AccessControlRuleAction" => "Allow,",
       "IngressInterface" => "eth1,",
         "InitiatorBytes" => "2372,",
               "Sinkhole" => "Unknown,",
       "InitiatorPackets" => "8,",
            "URLCategory" => "Uncategorized,",
                    "URL" => "https://firepower",
            "ConnectType" => "End,",
               "Protocol" => "TCP,",
                 "Client" => "SSL",
    "ApplicationProtocol" => "HTTPS,",
                "SrcPort" => "52929,",
               "UserName" => "No",
          "URLReputation" => "Risk",
                "DstPort" => "443,",

I have found that kv filter is working almost as I expected. The problem now is that I need to delete all output except some of the values that I need.

Use the prune filter.

Also I have found, that my outut is not perfect and some values has ",".

Perhaps the kv filter can't deal with keys containing spaces.

1 Like

Thanks you very much for help, kv was the answer.

kv {
  source => "msg"
  include_keys => ["Protocol", "SrcIP", "DstIP", "SrcPort", "DstPort", "URL" ]
	  value_split => ":"
	  trim_key => ","
	  trim_value => ","
	  target => "kv"
}

Thanks everyone for the reply!

For my case - the configuration should be like this:

input {
	beats {
		port => "5044"
	}
}
filter {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:Date} %{WORD} %{WORD}\: %{GREEDYDATA:msg}" }
      }
    kv {
      source => "msg"
      include_keys => ["Protocol", "SrcIP", "DstIP", "SrcPort", "DstPort", "URL" ]
	  value_split => ":"
	  trim_key => ","
	  trim_value => ","
	  target => "kv"
    }
}
output {
	stdout { codec => rubydebug }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.