Parse multiline xml via syslog


(Eric Mueller) #1

Hello,

I am trying to parse the following log below. I assume I will need combination of grok to grab the timestamp and then using multiline codec with xml parser. I shrunk the output but I can be about a few hundred IP addresses resulting over a 1000 lines.

2018 Jun 14 16:56:32,305  INFO [Thread-4] (LogResponse:logResponse) - <ssc xmlns="http://123.com/security/ssc">
  <response>
    <service>ddos</service>
    <state>
      <status>complete</status>
    </state>
    <query-response>
      <generic-device-responses>
        <generic-device-response>
          <ptnii-equip-name>ROUTERNAME</ptnii-equip-name>
          <raw-device-responses>
            <raw-device-response><![CDATA[<![CDATA[<?xml version="1.0" encoding="UTF-8"?><rpc-reply xmlns:junos="http://xml.juniper.net/junos/15.1F5/junos">
<firewall-information xmlns="http://xml.juniper.net/junos/15.1F5/junos-filter">
<filter-information>
<filter-name>__flowspec_default_inet__</filter-name>
<counter>
<counter-name>1.0.0.0,*</counter-name>
<packet-count>0</packet-count>
<byte-count>0</byte-count>
</counter>
<counter>
<counter-name>1.0.12.1,*</counter-name>
<packet-count>0</packet-count>
<byte-count>0</byte-count>
</counter>
</filter-information>
</firewall-information>
</rpc-reply>]]]]>><![CDATA[]]></raw-device-response>
          </raw-device-responses>
        </generic-device-response>
      </generic-device-responses>
    </query-response>
  </response>
</ssc>

Here is the logstash filter I was using before the xml was sent via syslog. The syslog line seems to be throwing me off I think. Not sure if I am doing this the right way or not. any help would be much appreciated.

input {
  stdin {
    codec => multiline {
      pattern => "(^<rpc-reply*) | (^<firewall-information*) | (^<filter-information*) "
      negate => "true"
      what => "previous"
      max_lines => 700
  }
}
}

filter {
    xml {
        store_xml => "false"
        source => "message"
        remove_namespaces => "true"
        remove_field => "message"

        xpath => [
            "//ptnii-equip-name/text()","equip_name",
            "//counter-name/text()","dest_address",
            "//packet-count/text()","packet_count",
            "//byte-count/text()","byte_count"
        ]
    }
}

output {
    stdout { codec => rubydebug }
}

(Eric Mueller) #2

It looks like the log did not get pasted correctly. I have attached it here.

    2018 Jun 14 16:56:32,305  INFO [Thread-4] (LogResponse:logResponse) - <ssc xmlns="http://123.com/security/ssc">
  <response>
    <service>ddos</service>
    <state>
      <status>complete</status>
    </state>
    <query-response>
      <generic-device-responses>
        <generic-device-response>
          <ptnii-equip-name>ROUTERNAME</ptnii-equip-name>
          <raw-device-responses>
            <raw-device-response><![CDATA[<![CDATA[<?xml version="1.0" encoding="UTF-8"?><rpc-reply xmlns:junos="http://xml.juniper.net/junos/15.1F5/junos">
<firewall-information xmlns="http://xml.juniper.net/junos/15.1F5/junos-filter">
<filter-information>
<filter-name>__flowspec_default_inet__</filter-name>
<counter>
<counter-name>1.0.0.0,*</counter-name>
<packet-count>0</packet-count>
<byte-count>0</byte-count>
</counter>
<counter>
<counter-name>1.0.12.1,*</counter-name>
<packet-count>0</packet-count>
<byte-count>0</byte-count>
</counter>
</filter-information>
</firewall-information>
</rpc-reply>]]]]>><![CDATA[]]></raw-device-response>
          </raw-device-responses>
        </generic-device-response>
      </generic-device-responses>
    </query-response>
  </response>
</ssc>

(Magnus Bäck) #3

I'm not sure what you're trying to do with this multiline configuration. The logic you're looking for is "if the current line does not begin with a timestamp, join with the previous line".


(Eric Mueller) #4

Thanks for the response. I want to parse the following fields.

ptnii-equip-name (there is only one ptnii-equip-name entry for the whole log message)
counter-name (this will contain source and destination IP contained in the traffic flow)
packet-count (this is the packet count for the traffic flow)
byte-count (this is the byte count for the traffic flow)

I tried the following multiline configuration (with many variations) based on your input but logstash is still not combining all lines into one message to be parsed. It looks like the xml portion is parsing the fields but they are all parsed as separate messages.

input {
  stdin {
    codec => multiline {
      pattern => "^%{SYSLOGTIMESTAMP}"
      negate => "false"
      what => "previous"
  }
}
}

Please help, thanks!


(Magnus Bäck) #5

Your timestamp doesn't match SYSLOGTIMESTAMP. Try ^%{YEAR} %{MONTH}\s+%{MONTHDAY} %{TIME}\s+%{LOGLEVEL}, assuming your log doesn't actually have leading whitespace as in the example you posted.


(Eric Mueller) #6

Thanks for the response.

I thought SYSLOGTIMESTAMP would work since I tested it on the grok debugger. I tried your recommendation but same result. Logstash is still parsing each line as a separate line.

Any ideas?

input {
  stdin {
    codec => multiline {
      pattern => "^%{YEAR} %{MONTH}\s+%{MONTHDAY} %{TIME}\s+%{LOGLEVEL}"
      negate => "false"
      what => "previous"
  }
}
}

(Eric Mueller) #7

There is a small update regarding how the log will be processed and sent to logstash. There will NOT be a syslog line for every query response from a router. The application will actually send multiple router responses in a single syslog message. I assume I can no longer use the syslog timestamp as pattern since logstash will not know when one response ends and where one starts when multiple responses are sent in a single log. I would think i would need to change my pattern to look like this....

input {
  stdin {
    codec => multiline {
      pattern => "<generic-device-response>"
      negate => "false"
      what => "previous"
  }
}
}

There are multiple spaces before the tag. Also, I would assume I need to match on this tag since I would like to parse the ptnii name / router name.

When I use this filter, it still does not work. Logstash is trying parse every line singly.

any ideas?

thanks!


(Magnus Bäck) #8

The negate option must be true so that the logic becomes "if the line does not begin with a timestamp, join with the previous line".


(Eric Mueller) #9

I don't think I can match on the timestamp based on my last update. There can be multiple query responses in a single log so I now must match on something different. Do you have a recommendation?

If I use the below filter, I only receive the output previous to the generic-device-response tag.

input {
  stdin {
    codec => multiline {
      pattern => "<generic-device-response>"
      negate => "true"
      what => "previous"
  }
}
}

Here is what the filter returns....

{
      "@version" => "1",
          "host" => "shdw01tool.ddostier4.att.net",
    "@timestamp" => 2018-06-18T20:20:19.664Z,
       "message" => "2018 Jun 14 16:56:32,305  INFO [Thread-4] (LogResponse:logResponse) - <ssc xmlns=\"http://123.com/security/ssc\">\n  <response>\n    <service>ddos</service>\n    <state>\n      <status>complete</status>\n    </state>\n    <query-response>\n      <generic-device-responses>",
          "tags" => [
        [0] "multiline"
    ]
}

thanks for the help!


(Magnus Bäck) #10

There can be multiple query responses in a single log so I now must match on something different.

Can you give an example of this?


(Eric Mueller) #11

OK, I think matching on the timestamp will be fine, I was thinking that since we are matching on the timestamp as a new syslog that logstash would need to see this for every query response.
When there are multiple responses in a single syslog, there will be multiple generic-device-response tags under generic-device-responses.

2018 Jun 14 16:56:32,305  INFO [Thread-4] (LogResponse:logResponse) - <ssc xmlns="http://123.com/security/ssc">
  <response>
    <service>ddos</service>
    <state>
      <status>complete</status>
    </state>
    <query-response>
      <generic-device-responses>
        <generic-device-response>
          <ptnii-equip-name>ROUTERNAME1</ptnii-equip-name>
          <raw-device-responses>
            <raw-device-response><![CDATA[<![CDATA[<?xml version="1.0" encoding="UTF-8"?><rpc-reply xmlns:junos="http://xml.juniper.net/junos/15.1F5/junos">
<firewall-information xmlns="http://xml.juniper.net/junos/15.1F5/junos-filter">
<filter-information>
<filter-name>__flowspec_default_inet__</filter-name>
<counter>
<counter-name>1.0.0.0,9.9.0.1</counter-name>
<packet-count>1200</packet-count>
<byte-count>1500</byte-count>
</counter>
<counter>
<counter-name>1.0.12.1,9.9.0.0</counter-name>
<packet-count>1290</packet-count>
<byte-count>2300</byte-count>
</counter>
</filter-information>
</firewall-information>
</rpc-reply>]]]]>><![CDATA[]]></raw-device-response>
          </raw-device-responses>
        </generic-device-response>
		        <generic-device-response>
		          <ptnii-equip-name>ROUTERNAME2</ptnii-equip-name>
		          <raw-device-responses>
		            <raw-device-response><![CDATA[<![CDATA[<?xml version="1.0" encoding="UTF-8"?><rpc-reply xmlns:junos="http://xml.juniper.net/junos/15.1F5/junos">
<firewall-information xmlns="http://xml.juniper.net/junos/15.1F5/junos-filter">
<filter-information>
<filter-name>__flowspec_default_inet__</filter-name>
<counter>
<counter-name>1.0.0.0,90.91.92.93</counter-name>
<packet-count>2109</packet-count>
<byte-count>3400</byte-count>
</counter>
<counter>
<counter-name>1.0.12.1,90.96.96.90</counter-name>
<packet-count>34000</packet-count>
<byte-count>5698</byte-count>
</counter>
</filter-information>
</firewall-information>
</rpc-reply>]]]]>><![CDATA[]]></raw-device-response>
          </raw-device-responses>
        </generic-device-response>
      </generic-device-responses>
    </query-response>
  </response>
</ssc>

That being said, I tried the following input using timestamp with negate as true. I get no response from logstash.

input {
  stdin {
    codec => multiline {
      pattern => "^%{YEAR} %{MONTH}\s+%{MONTHDAY} %{TIME}\s+%{LOGLEVEL}*"
      negate => "true"
      what => "previous"
  }
}
}

(Magnus Bäck) #12

When there are multiple responses in a single syslog, there will be multiple generic-device-response tags under generic-device-responses.

Yes, but that's something we deal with with a proper XML parser (i.e. the xml filter). Never ever parse XML using regular expressions.

That being said, I tried the following input using timestamp with negate as true. I get no response from logstash.

Does your test input contain more than one message? Regardless it's wise to set the multiline codec's auto_flush_interval option to something reasonably low so that Logstash doesn't wait forever for the next timestamp before flushing what it's got queued up. When using the stdin input it appears Logstash shuts down before auto_flush_interval kicks in so make sure you're testing with >1 message as test input.


(Eric Mueller) #13

I am using a single query response as provided in my first post. I have tried adding the auto_flush_interval but still no joy.

any other ideas?


(Magnus Bäck) #14

I am using a single query response as provided in my first post.

Not entirely sure what you mean by that. Whenever possible use examples instead of describing what you're testing.

This works for me (data is a file containing two identical log entries):

$ cat test.config
input {
  stdin {
    codec => multiline {
      pattern => "^%{YEAR} %{MONTH}\s+%{MONTHDAY} %{TIME}\s+%{LOGLEVEL}"
      negate => "true"
      what => "previous"
      auto_flush_interval => 1
    }
  }
}
output { stdout { codec => rubydebug } }
$ /opt/logstash/bin/logstash -f test.config < data
Settings: Default pipeline workers: 8
Pipeline main started
{
    "@timestamp" => "2018-06-20T06:15:55.157Z",
       "message" => "2018 Jun 14 16:56:32,305  INFO [Thread-4] (LogResponse:logResponse) -\n<ssc xmlns=\"http://123.com/security/ssc\">\n  <response>\n    <service>ddos</service>\n    <state>\n      <status>complete</status>\n    </state>\n    <query-response>\n      <generic-device-responses>\n        <generic-device-response>\n          <ptnii-equip-name>ROUTERNAME1</ptnii-equip-name>\n          <raw-device-responses>\n            <raw-device-response><![CDATA[<![CDATA[<?xml version=\"1.0\"\nencoding=\"UTF-8\"?><rpc-reply\nxmlns:junos=\"http://xml.juniper.net/junos/15.1F5/junos\">\n<firewall-information\nxmlns=\"http://xml.juniper.net/junos/15.1F5/junos-filter\">\n<filter-information>\n<filter-name>__flowspec_default_inet__</filter-name>\n<counter>\n<counter-name>1.0.0.0,9.9.0.1</counter-name>\n<packet-count>1200</packet-count>\n<byte-count>1500</byte-count>\n</counter>\n<counter>\n<counter-name>1.0.12.1,9.9.0.0</counter-name>\n<packet-count>1290</packet-count>\n<byte-count>2300</byte-count>\n</counter>\n</filter-information>\n</firewall-information>\n</rpc-reply>]]]]>><![CDATA[]]></raw-device-response>\n          </raw-device-responses>\n        </generic-device-response>\n\t\t        <generic-device-response>\n\t\t          <ptnii-equip-name>ROUTERNAME2</ptnii-equip-name>\n\t\t          <raw-device-responses>\n\t\t            <raw-device-response><![CDATA[<![CDATA[<?xml\nversion=\"1.0\" encoding=\"UTF-8\"?><rpc-reply\nxmlns:junos=\"http://xml.juniper.net/junos/15.1F5/junos\">\n<firewall-information\nxmlns=\"http://xml.juniper.net/junos/15.1F5/junos-filter\">\n<filter-information>\n<filter-name>__flowspec_default_inet__</filter-name>\n<counter>\n<counter-name>1.0.0.0,90.91.92.93</counter-name>\n<packet-count>2109</packet-count>\n<byte-count>3400</byte-count>\n</counter>\n<counter>\n<counter-name>1.0.12.1,90.96.96.90</counter-name>\n<packet-count>34000</packet-count>\n<byte-count>5698</byte-count>\n</counter>\n</filter-information>\n</firewall-information>\n</rpc-reply>]]]]>><![CDATA[]]></raw-device-response>\n          </raw-device-responses>\n        </generic-device-response>\n      </generic-device-responses>\n    </query-response>\n  </response>\n</ssc>",
      "@version" => "1",
          "tags" => [
        [0] "multiline"
    ],
          "host" => "lnxolofon"
}
Pipeline main has been shutdown
stopping pipeline {:id=>"main"}

(Eric Mueller) #15

I apologize, I misread the part where you specified that my data log should have more than 1 message. I now have the multiline config working! thanks so much!

Now we got that out of the way, I added the xpath filters to the logstash configuration file but they do not seem to be matching.

One thing to note is that I am matching on the field counter-name as destination address, but it is really source address and destination address with a comma separator.

For instance here is what it looks like when there is no destination address, an asterisk is in place of the destination IP address.

<counter-name>1.0.0.0,*</counter-name>

Here is what it would look like if it had both source and destination IP addresses. I want to parse these as such.

<counter-name>1.0.0.0,2.0.0.0</counter-name>

When I run the following configuration, I get the same output as if there were no xml filters in place.

input {
  stdin {
    codec => multiline {
      pattern => "^%{YEAR} %{MONTH}\s+%{MONTHDAY} %{TIME}\s+%{LOGLEVEL}"
      negate => "true"
      what => "previous"
      auto_flush_interval => 1
  }
}
}

filter {
    xml {
        store_xml => "false"
        source => "message"
        remove_namespaces => "true"
        remove_field => "message"

        xpath => [
            "//ptnii-equip-name/text()","ptnii_name",
            "//counter-name/text()","dest_address",
            "//packet-count/text()","packet_count",
            "//byte-count/text()","byte_count"
        ]
    }
}

output {
    stdout { codec => rubydebug }
}

(Magnus Bäck) #16

You need a grok or dissect filter to separate the timestamp and other stuff from the XML payload. Right now you're trying to parse "2018 Jun 14 16:56:32,305 INFO [Thread-4]" as XML which obviously doesn't succeed.

Here is what it would look like if it had both source and destination IP addresses. I want to parse these as such.

Sure, but you can't do that with the xml filter. You need an additional filter after that, e.g. a dissect filter.


(Eric Mueller) #17

thanks Magnus!

OK, please let me know if I am going the right direction. I tried to grok the syslog line and then use greedydata to parse the XML. However, I keep getting a grok parse failure. I have tested it on the grok debugger but it still gives an error.

input {
  stdin {
    codec => multiline {
      pattern => "^%{YEAR} %{MONTH}\s+%{MONTHDAY} %{TIME}\s+%{LOGLEVEL}"
      negate => "true"
      what => "previous"
      auto_flush_interval => 1
  }
}
}

filter {

   grok {
        patterns_dir => "./patterns"
        match => ["message", "^%{YEAR} %{MONTH}\s+%{MONTHDAY} %{TIME}\s+%{LOGLEVEL} \[%{THREAD:thread}\] \(%{LOGRESPONSE:logresponse}\) - <ssc xmlns=\"%{URI:uri}\"> %{GREEDYDATA:data}"]
}

   xml {
        store_xml => "false"
        source => "data"
        xpath => [
            "//ptnii-equip-name/text()","ptnii_name",
            "//counter-name/text()","dest_address",
            "//packet-count/text()","packet_count",
            "//byte-count/text()","byte_count"
        ]
    }
}


output {
    stdout { codec => rubydebug }
}

Any ideas on what I am doing wrong here?

I tried removing the XML filter piece but the grok failure messages still shows up, so its in the grok somewhere.

thanks!


(Magnus Bäck) #18

What does an example events produce by Logstash look like, i.e. what is the output of your stdout output?


(Eric Mueller) #19

Here is the output, now that I look closer, its only showing the first log message. I have two log messages in this data file like you suggested in order for me to test the multiline parser. I am not sure why it would fail at the second message.

{
      "@version" => "1",
          "host" => "shdw01tool.ddostier4.att.net",
    "@timestamp" => 2018-06-21T23:49:15.723Z,
       "message" => "2018 Jun 14 16:56:32,305  INFO [Thread-4] (LogResponse:logResponse) - <ssc xmlns=\"http://123.com/security/ssc\">\n  <response>\n    <service>ddos</service>\n    <state>\n      <status>complete</status>\n    </state>\n    <query-response>\n      <generic-device-responses>\n        <generic-device-response>\n          <ptnii-equip-name>ROUTER</ptnii-equip-name>\n          <raw-device-responses>\n            <raw-device-response><![CDATA[<![CDATA[<?xml version=\"1.0\" encoding=\"UTF-8\"?><rpc-reply xmlns:junos=\"http://xml.juniper.net/junos/15.1F5/junos\">\n<firewall-information xmlns=\"http://xml.juniper.net/junos/15.1F5/junos-filter\">\n<filter-information>\n<filter-name>__flowspec_default_inet__</filter-name>\n<counter>\n<counter-name>1.0.0.0,*</counter-name>\n<packet-count>0</packet-count>\n<byte-count>0</byte-count>\n</counter>\n<counter>\n<counter-name>1.0.12.1,*</counter-name>\n<packet-count>0</packet-count>\n<byte-count>0</byte-count>\n</counter>\n<counter>\n<counter-name>1.125.161.25,*</counter-name>\n<packet-count>0</packet-count>\n<byte-count>0</byte-count>\n</counter>\n<counter>\n<counter-name>1.125.161.5,*</counter-name>\n<packet-count>0</packet-count>\n<byte-count>0</byte-count>\n</counter>\n<counter>\n<counter-name>10.0.0.0,*</counter-name>\n<packet-count>0</packet-count>\n<byte-count>0</byte-count>\n</counter>\n<counter>\n<counter-name>10.10.11.1,*</counter-name>\n<packet-count>0</packet-count>\n<byte-count>0</byte-count>\n</counter>\n<counter>\n<counter-name>10.100.100.49,*</counter-name>\n<packet-count>0</packet-count>\n<byte-count>0</byte-count>\n</counter>\n<counter>\n<counter-name>10.105.6.200,*</counter-name>\n<packet-count>0</packet-count>\n<byte-count>0</byte-count>\n</counter>\n<counter>\n<counter-name>10.11.1.1,*</counter-name>\n<packet-count>14691295769</packet-count>\n<byte-count>21713735146582</byte-count>\n</counter>\n<counter>\n<counter-name>10.15.0.1,*</counter-name>\n<packet-count>0</packet-count>\n<byte-count>0</byte-count>\n</counter>\n<counter>\n<counter-name>10.15.1.1,*</counter-name>\n<packet-count>0</packet-count>\n<byte-count>0</byte-count>\n</counter>\n</filter-information>\n</firewall-information>\n</rpc-reply>]]]]>><![CDATA[]]></raw-device-response>\n          </raw-device-responses>\n        </generic-device-response>\n      </generic-device-responses>\n    </query-response>\n  </response>\n</ssc>",
          "tags" => [
        [0] "multiline",
        [1] "_grokparsefailure"
    ]
}

(Magnus Bäck) #20

There's something wrong with the grok filter so the data field with XML to parse it never created. Build your grok expression gradually, starting at the simplest possible (e.g. %{YEAR}). At some point it'll stop working and then you've narrowed things down. I'm suspicious about the LOGRESPONSE pattern.

Also, replace

<ssc xmlns=\"%{URI:uri}\"> %{GREEDYDATA:data}

with

%{GREEDYDATA:data}

since you need the "ssc" opening tag in the XML string you want to parse.