Can't parse the log

Greetings, guys!

I must say that I am a network engineer, who works with network and server environment. Sadly I am not a programmer, more to say I am really dumb at programming logic.

I have a question on to how to parse the log file correctly. The log file I got looks like this:
Apr 11 11:45:01 firepower SFIMS: Protocol: TCP, SrcIP: 172.31.7.74, OriginalClientIP: ::, DstIP: 172.31.11.2, SrcPort: 53064, DstPort: 443, TCPFlags: 0x0, IngressInterface: eth1, DE: Primary Detection Engine (3b4821d2-0dfb-11e7-ac66-d3a8ab9358ce), Policy: Policy-Intact, ConnectType: Start, AccessControlRuleName: Unknown, AccessControlRuleAction: Allow, Prefilter Policy: Unknown, UserName: No Authentication Required, InitiatorPackets: 2, ResponderPackets: 1, InitiatorBytes: 126, ResponderBytes: 66, NAPPolicy: Balanced Security and Connectivity, DNSResponseType: No Error, Sinkhole: Unknown, URLCategory: Unknown, URLReputation: Risk unknown

Im looking for filter and code, that will help me to bring this mess in to what I think should look like this:

Date: Apr 11 11:45:01
Protocol: TCP // But it can be UDP, I solved this via grok pattern PROTO \b(?:TCP?|UDP?)\b . Maybe there is less complex way?
SrcIP: 172.31.7.74
DstIP: 172.31.11.2
SrcPort: 53064
DstPort: 443
URL: http://example.local // This string doesnt exist in original log file, but it might appear, and I want filter to search for this string and show it in case it exists.

You can see that some values in log are empty(f.e. OriginalClientIP). I need to know how to write my filter so pattern will work if OriginalClientIP will have value and will not.

P.S. Sorry for bad english. Sorry for creating topic, I just lost my hope trying to understand everything and at the same time...

__

UPD:
Looks like kv is what I need.

The problem now is that values are sometimes has extra "," symbol, which is not needed.

Also I want to delete all output except Date, Protocol, SrcIP, DstIP, SrcPort, DstPort and URL.

My pipe.conf:

input {
	beats {
		port => "5044"
	}
}
filter {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:Date} %{WORD} %{WORD}\: %{GREEDYDATA:msg}" }
      }
    kv {
      source => "msg"
      value_split => ":"
      target => "kv"
    }
}
output {
	stdout { codec => rubydebug }
}

And output is as follows:

 "offset" => 9532429,
    "input_type" => "log",
        "source" => "/var/log/172.31.11.21/syslog.log",
            "kv" => {
                         "Policy" => [
            [0] "Policy-Intact,",
            [1] "Unknown,"
        ],
                             "DE" => "Primary",
                      "NAPPolicy" => "Balanced",
        "AccessControlRuleAction" => "Allow,",
               "IngressInterface" => "eth1,",
                 "InitiatorBytes" => "2372,",
                       "Sinkhole" => "Unknown,",
               "InitiatorPackets" => "8,",
                    "URLCategory" => "Uncategorized,",
                            "URL" => "https://firepower",
                    "ConnectType" => "End,",
                       "Protocol" => "TCP,",
                         "Client" => "SSL",
            "ApplicationProtocol" => "HTTPS,",
                        "SrcPort" => "52929,",
                       "UserName" => "No",
                  "URLReputation" => "Risk",
                        "DstPort" => "443,",
                 "ResponderBytes" => "2904,",
                          "SrcIP" => "172.31.4.50,",
                "DNSResponseType" => "No",
                       "TCPFlags" => "0x0,",
               "ResponderPackets" => "9,",
                          "DstIP" => "172.31.11.20,",
          "AccessControlRuleName" => "test,",
               "OriginalClientIP" => "::,"
    },
       "message" => "Apr 12 12:30:54 firepower SFIMS: Protocol: TCP, SrcIP: 172.31.4.50, OriginalClientIP: ::, DstIP: 172.31.11.20, SrcPort: 52929, DstPort: 443, TCPFlags: 0x0, IngressInterface: eth1, DE: Primary Detection Engine (3b4821d2-0dfb-11e7-ac66-d3a8ab9358ce), Policy: Policy-Intact, ConnectType: End, AccessControlRuleName: test, AccessControlRuleAction: Allow, Prefilter Policy: Unknown, UserName: No Authentication Required, Client: SSL client, ApplicationProtocol: HTTPS, InitiatorPackets: 8, ResponderPackets: 9, InitiatorBytes: 2372, ResponderBytes: 2904, NAPPolicy: Balanced Security and Connectivity, DNSResponseType: No Error, Sinkhole: Unknown, URLCategory: Uncategorized, URLReputation: Risk unknown, URL: https://firepower",
          "type" => "log",
          "Date" => "Apr 12 12:30:54",
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
    "@timestamp" => 2017-04-12T12:30:50.066Z,
      "@version" => "1",
          "beat" => {
        "hostname" => "elastic",
            "name" => "elastic",
         "version" => "5.3.0"
    },
          "host" => "elastic"
}
{
           "msg" => "Protocol: UDP, SrcIP: 172.31.11.21, OriginalClientIP: ::, DstIP: 46.46.160.130, SrcPort: 123, DstPort: 123, TCPFlags: 0x0, IngressInterface: eth1, DE: Primary Detection Engine (3b4821d2-0dfb-11e7-ac66-d3a8ab9358ce), Policy: Policy-Intact, ConnectType: Start, AccessControlRuleName: Unknown, AccessControlRuleAction: Allow, Prefilter Policy: Unknown, UserName: No Authentication Required, Client: NTP client, ApplicationProtocol: NTP, InitiatorPackets: 1, ResponderPackets: 0, InitiatorBytes: 90, ResponderBytes: 0, NAPPolicy: Balanced Security and Connectivity, DNSResponseType: No Error, Sinkhole: Unknown, URLCategory: Unknown, URLReputation: Risk unknown",

You can use the grok debugger to sort this out

http://grokdebug.herokuapp.com/

You can use this format as a starter

%{SYSLOGTIMESTAMP:timestamp} %{WORD:NA01} %{WORD:NA02}: Protocol: %{WORD:protocol}, SrcIP: %{IP:srcip}

Greetings, mkorayem!

Currently I am wokring with this tool, but I simply don't understand how to deal with dynamic changes in log (as I said OriginalClientIP can have value sometimes, and sometimes not). Also I dont understand how to look for URL, that might be presented in log from time to time.

Hi Anton,

Actually I am a newbie to ELK I am trying to make it our central logging aggregation system for all windows and linux logs, systems and services.

If there is a field that may or may not be there you can use this pattern (?:%{IP:origianlclientip})?

and here you can find a lot of patterns you can use out of the box

Most of the message can be treated as a key/value list that you can use a kv filter to parse. Use a grok filter to separate the timestamp and whatever other prefixes you have from the key/value list, then run a kv filter on the remainder.

Regarding the originalclient IP, you can use the grok pattern something like this:

Your log pattern:

OriginalClientIP: ::,

I guess when it has IP, it will look like this?

OriginalClientIP: 1.1.1.1,

So the GROK pattern in this case would be,

((?:\:\:,?)|(?:%{IP:OrigCliIP})?),

It works in grok debugger online tool.

Dear Magnus, thank you very much for advice!
Looks like kv is what I need.

The problem now is that values are sometimes has extra "," symbol, which is not needed.

Also I want to delete all output except Date, Protocol, SrcIP, DstIP, SrcPort, DstPort and URL.

My pipe.conf:

input {
	beats {
		port => "5044"
	}
}
filter {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:Date} %{WORD} %{WORD}\: %{GREEDYDATA:msg}" }
      }
    kv {
      source => "msg"
      value_split => ":"
      target => "kv"
    }
}
output {
	stdout { codec => rubydebug }
}

And output is as follows:

 "offset" => 9532429,
    "input_type" => "log",
        "source" => "/var/log/172.31.11.21/syslog.log",
            "kv" => {
                         "Policy" => [
            [0] "Policy-Intact,",
            [1] "Unknown,"
        ],
                             "DE" => "Primary",
                      "NAPPolicy" => "Balanced",
        "AccessControlRuleAction" => "Allow,",
               "IngressInterface" => "eth1,",
                 "InitiatorBytes" => "2372,",
                       "Sinkhole" => "Unknown,",
               "InitiatorPackets" => "8,",
                    "URLCategory" => "Uncategorized,",
                            "URL" => "https://firepower",
                    "ConnectType" => "End,",
                       "Protocol" => "TCP,",
                         "Client" => "SSL",
            "ApplicationProtocol" => "HTTPS,",
                        "SrcPort" => "52929,",
                       "UserName" => "No",
                  "URLReputation" => "Risk",
                        "DstPort" => "443,",
                 "ResponderBytes" => "2904,",
                          "SrcIP" => "172.31.4.50,",
                "DNSResponseType" => "No",
                       "TCPFlags" => "0x0,",
               "ResponderPackets" => "9,",
                          "DstIP" => "172.31.11.20,",
          "AccessControlRuleName" => "test,",
               "OriginalClientIP" => "::,"
    },
       "message" => "Apr 12 12:30:54 firepower SFIMS: Protocol: TCP, SrcIP: 172.31.4.50, OriginalClientIP: ::, DstIP: 172.31.11.20, SrcPort: 52929, DstPort: 443, TCPFlags: 0x0, IngressInterface: eth1, DE: Primary Detection Engine (3b4821d2-0dfb-11e7-ac66-d3a8ab9358ce), Policy: Policy-Intact, ConnectType: End, AccessControlRuleName: test, AccessControlRuleAction: Allow, Prefilter Policy: Unknown, UserName: No Authentication Required, Client: SSL client, ApplicationProtocol: HTTPS, InitiatorPackets: 8, ResponderPackets: 9, InitiatorBytes: 2372, ResponderBytes: 2904, NAPPolicy: Balanced Security and Connectivity, DNSResponseType: No Error, Sinkhole: Unknown, URLCategory: Uncategorized, URLReputation: Risk unknown, URL: https://firepower",
          "type" => "log",
          "Date" => "Apr 12 12:30:54",
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
    "@timestamp" => 2017-04-12T12:30:50.066Z,
      "@version" => "1",
          "beat" => {
        "hostname" => "elastic",
            "name" => "elastic",
         "version" => "5.3.0"
    },
          "host" => "elastic"
}
{
           "msg" => "Protocol: UDP, SrcIP: 172.31.11.21, OriginalClientIP: ::, DstIP: 46.46.160.130, SrcPort: 123, DstPort: 123, TCPFlags: 0x0, IngressInterface: eth1, DE: Primary Detection Engine (3b4821d2-0dfb-11e7-ac66-d3a8ab9358ce), Policy: Policy-Intact, ConnectType: Start, AccessControlRuleName: Unknown, AccessControlRuleAction: Allow, Prefilter Policy: Unknown, UserName: No Authentication Required, Client: NTP client, ApplicationProtocol: NTP, InitiatorPackets: 1, ResponderPackets: 0, InitiatorBytes: 90, ResponderBytes: 0, NAPPolicy: Balanced Security and Connectivity, DNSResponseType: No Error, Sinkhole: Unknown, URLCategory: Unknown, URLReputation: Risk unknown",

Helo, raamee!

I have found that kv filter is working almost as I expected. The problem now is that I need to delete all output except some of the values that I need. Also I have found, that my outut is not perfect and some values has ",".

              "NAPPolicy" => "Balanced",
"AccessControlRuleAction" => "Allow,",
       "IngressInterface" => "eth1,",
         "InitiatorBytes" => "2372,",
               "Sinkhole" => "Unknown,",
       "InitiatorPackets" => "8,",
            "URLCategory" => "Uncategorized,",
                    "URL" => "https://firepower",
            "ConnectType" => "End,",
               "Protocol" => "TCP,",
                 "Client" => "SSL",
    "ApplicationProtocol" => "HTTPS,",
                "SrcPort" => "52929,",
               "UserName" => "No",
          "URLReputation" => "Risk",
                "DstPort" => "443,",

I have found that kv filter is working almost as I expected. The problem now is that I need to delete all output except some of the values that I need.

Use the prune filter.

Also I have found, that my outut is not perfect and some values has ",".

Perhaps the kv filter can't deal with keys containing spaces.

Thanks you very much for help, kv was the answer.

kv {
  source => "msg"
  include_keys => ["Protocol", "SrcIP", "DstIP", "SrcPort", "DstPort", "URL" ]
	  value_split => ":"
	  trim_key => ","
	  trim_value => ","
	  target => "kv"
}

Thanks everyone for the reply!

For my case - the configuration should be like this:

input {
	beats {
		port => "5044"
	}
}
filter {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:Date} %{WORD} %{WORD}\: %{GREEDYDATA:msg}" }
      }
    kv {
      source => "msg"
      include_keys => ["Protocol", "SrcIP", "DstIP", "SrcPort", "DstPort", "URL" ]
	  value_split => ":"
	  trim_key => ","
	  trim_value => ","
	  target => "kv"
    }
}
output {
	stdout { codec => rubydebug }
}