Parsing nessus XML in Logstash

Hi, i've come upon a problem on parsing the xml file to be displayed on kibana. Below are the configuration for my logstash

input {
	file {
		path => "/home/user/nessus/*"
	    start_position => "beginning"
	    tags => "nessus"
    	type => "nessus"
	}
}

filter {
	##interpret the message as XML
    if [type] == "nessus" {
        xml {
            source => "message"
            store_xml => false

            xpath => [
                "/NessusClientData_v2/Report/@name", "report_name",
                "/NessusClientData_v2/Report/ReportHost/text()", "report_host",
                "/NessusClientData_v2/Report/ReportHost/ReportItem/text()", "report_item",
                "/NessusClientData_v2/Report/ReportHost/@name", "report_host_name",
                "/NessusClientData_v2/Report/ReportHost/HostProperties/tag[@name='HOST_START']/text()", "report_host_start",
                "/NessusClientData_v2/Report/ReportHost/HostProperties/tag[@name='HOST_END']/text()", "report_host_end",
                "/NessusClientData_v2/Report/ReportHost/ReportItem/@port", "report_item_port"
            ]
        }
        mutate {
            remove_field => ["message"]
            replace => {
                "report_host_start" => "%{report_host_start}"
            }
            replace => {
                "report_host_end" => "%{report_host_end}"
            }
            convert => {
                "report_item_severity" => "integer"
            }
            add_field => {
                report_name => "%{report_name}"
                report_host => "%{report_host}"
                report_item => "%{report_item}"
            }

        }
        date {
            match => ["report_host_start", "EEE MMM dd HH:mm:ss yyyy"]
            target => "report_host_start"
            locale => "en_US"
        }
        date {
            match => ["report_host_end", "EEE MMM dd HH:mm:ss yyyy"]
            target => "report_host_end"
            locale => "en_US"
        }
    }
}

output {
  if [@metadata][beat] {
    	elasticsearch {
	      hosts => ["192.168.1.152:9200"]
	      manage_template => false
	      index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
	      user => elastic
	      password => password
    	}
  	} else {
    	elasticsearch { 
    		hosts => ["192.168.1.152:9200"]
    		index => "nessus-data-%{+YYYY.MM.dd}"
    		user => elastic
      		password => password
    	}
    	stdout { codec => rubydebug }
  	}
}

Here are the tree view of my nessus xml file

The data however displayed wrongly in kibana not the same as expected, here are the processed data in kibana

Unless your XML file has a single line you have to use a multiline codec to join the lines of the file into a single event. This is the third time the topic of parsing XML files comes up this week.

        replace => {
           "report_host_start" => "%{report_host_start}"
       }

So... this replaces the contents of the report_host_start field with the contents of the report_host_start field. What are you really trying to accomplish here?

I'm trying to extract my .nessus file to get the important data such as report name, host name and severity and such on. I've updated my config file to include the multiline codec but now it seems no data is being send to elasticsearch. Below are my new config file

input {
	file {
		path => "/home/user/nessus/*"
	    start_position => "beginning"
        codec => multiline {
            pattern => "^<\?NessusClientData_v2 .*\>"
            negate => "true"
            what => "previous"
        }
	    tags => "nessus"
    	type => "nessus"
	}
}

filter {
	##interpret the message as XML
    if [type] == "nessus" {
        xml {
            source => "message"
            store_xml => false

            xpath => [
                "/NessusClientData_v2/Report/@name", "report_name",
                "/NessusClientData_v2/Report/ReportHost/text()", "report_host",
                "/NessusClientData_v2/Report/ReportHost/ReportItem/text()", "report_item",
                "/NessusClientData_v2/Report/ReportHost/@name", "report_host_name",
                "/NessusClientData_v2/Report/ReportHost/HostProperties/tag[@name='HOST_START']/text()", "report_host_start",
                "/NessusClientData_v2/Report/ReportHost/HostProperties/tag[@name='HOST_END']/text()", "report_host_end",
                "/NessusClientData_v2/Report/ReportHost/ReportItem/@port", "report_item_port"
            ]
        }
        mutate {
            convert => {
                "report_item_severity" => "integer"
            }

        }
        date {
            match => ["report_host_start", "EEE MMM dd HH:mm:ss yyyy"]
            target => "report_host_start"
            locale => "en_US"
        }
        date {
            match => ["report_host_end", "EEE MMM dd HH:mm:ss yyyy"]
            target => "report_host_end"
            locale => "en_US"
        }
    }
}

output {
    	elasticsearch { 
    		hosts => ["192.168.1.152:9200"]
    		index => "nessus-data-%{+YYYY.MM.dd}"
    		user => elastic
      		password => password
    	}
    	stdout { codec => rubydebug }
}

I can give you my sample .nessus file by sending to your inbox if you want to take a look how the xml structure is.

EDIT: the logstash able to pass the data to elasticsearch but in kibana there is no field from xpath

If xpath is not working try setting store_xml to true, supplying a target and verifying the rubydebug output looks the way you expect.

You'll want to set the codec's auto_flush_interval option to something quite low. That'll make sure that Logstash stops waiting for the next <?NessusClientData_v2> line that'll never come.

I tried the store_xml to true and set the target, but still the field declared on the xpath is not showing in kibana. Below are the current setting. I will send my .nessus sample to you.

file {
		path => "/home/user/nessus/*"
	    start_position => "beginning"
        codec => multiline {
            pattern => "^<\?NessusClientData_v2 .*\>"
            auto_flush_interval => 1
            negate => "true"
            what => "previous"
        }
	    tags => "nessus"
    	type => "nessus"
	}
}

filter {
	##interpret the message as XML
    if [type] == "nessus" {
        xml {
            source => "message"
            store_xml => true
            target => "doc"
            xpath => 
            [
                "/NessusClientData_v2/Report/@name", "report_name",
                "/NessusClientData_v2/Report/ReportHost/text()", "report_host",
                "/NessusClientData_v2/Report/ReportHost/ReportItem/text()", "report_item",
                "/NessusClientData_v2/Report/ReportHost/@name", "report_host_name",
                "/NessusClientData_v2/Report/ReportHost/HostProperties/tag[@name='HOST_START']/text()", "report_host_start",
                "/NessusClientData_v2/Report/ReportHost/HostProperties/tag[@name='HOST_END']/text()", "report_host_end",
                "/NessusClientData_v2/Report/ReportHost/ReportItem/@port", "report_item_port",
                "/NessusClientData_v2/Report/ReportHost/ReportItem/@severity", "risk_score"
            ]
        }
        mutate {
            convert => {
                "risk_score" => "integer"
            }

        }
        date {
            match => ["report_host_start", "EEE MMM dd HH:mm:ss yyyy"]
            target => "report_host_start"
            locale => "en_US"
        }
        date {
            match => ["report_host_end", "EEE MMM dd HH:mm:ss yyyy"]
            target => "report_host_end"
            locale => "en_US"
        }
    }
}

What does an example event produced by Logstash look like? Use a stdout { codec => rubydebug } output or copy/paste from Kibana's JSON tab.

Sorry i don't know how to view the output from stdout { codec => rubydebug } but i will paste the json output from kibana. Below are one of the output from kibana

{
  "_index": "nessus-data-2018.02.22",
  "_type": "doc",
  "_id": "nltpvGEBvBe04uIeWE7A",
  "_version": 1,
  "_score": null,
  "_source": {
    "@version": "1",
    "path": "/home/user/nessus/small.nessus",
    "tags": [
      "nessus"
    ],
    "host": "debian",
    "type": "nessus",
    "message": "</NessusClientData_v2>",
    "@timestamp": "2018-02-22T07:27:55.965Z"
  },
  "fields": {
    "@timestamp": [
      "2018-02-22T07:27:55.965Z"
    ]
  },
  "sort": [
    1519284475965
  ]
}

This is confusing. This event indicates that the multiline codec configuration is wrong, but

        pattern => "^<\?NessusClientData_v2 .*\>"

clearly shouldn't match anything except maybe the first line of the document (which I haven't looked at).

I changed my pattern by following these example on github (https://github.com/patrick-vonsteht/logstash-multiline-xml-file-parsing). The data is showing in my kibana but not all of them, it looks like it stops halfway when parsing. Below are my modification made to the conf file.

input {
  file {
    path => "/home/user/nessus/*"
    sincedb_path => "/dev/null"
    start_position => "beginning"
    codec => multiline {
      pattern => "<Report |</NessusClientData_v2>"
      auto_flush_interval => 1
      negate => "true"
      what => "previous"
    }
      tags => "nessus"
      type => "nessus"
  }
}

filter {
  ##interpret the message as XML
    if [type] == "nessus" {
        xml {
            source => "message"
            store_xml => "false"
            xpath => ["/Report/@name", report_name]
            xpath => ["/Report/ReportHost/ReportItem/@pluginName", plugin_name]
            xpath => ["/Report/ReportHost/ReportItem/@pluginID", plugin_id]
            xpath => ["/Report/ReportHost/ReportItem/@severity", risk_score]https://github.com/patrick-vonsteht/logstash-multiline-xml-file-parsing
            xpath => ["/Report/ReportHost/ReportItem/@port", port]
            xpath => ["/Report/ReportHost/HostProperties/tag[@name='HOST_START']/text()", report_host_start]
            xpath => ["/Report/ReportHost/HostProperties/tag[@name='HOST_END']/text()", report_host_end]
        }
        mutate {
          #remove_field => ["message"]
          convert => {
              "risk_score" => "integer"
          }

        }
        date {
            match => ["report_host_start", "EEE MMM dd HH:mm:ss yyyy"]
            target => "report_host_start"
            locale => "en_US"
        }
        date {
            match => ["report_host_end", "EEE MMM dd HH:mm:ss yyyy"]
            target => "report_host_end"
            locale => "en_US"
        }
    }
}

output {
      elasticsearch { 
        hosts => ["192.168.1.152:9200"]
        index => "nessus-data-%{+YYYY.MM.dd}"
        user => elastic
          password => password
      }
      stdout { codec => rubydebug }
}

The data is showing in my kibana but not all of them, it looks like it stops halfway when parsing.

Okay, but please show us what you get.

Have you looked at VulnWhisperer, it was featured in an ES video recently. Handles export from Nessus and might save you some formatting leg work.

Mileage may vary
-krw

Yes i've looked at VulnWhisperer but decide not to go with it as it require csv format and not matching to my project requirement.

Below are the collected pluginID, which shows only 5 id.

While the xml data contains more than 312 id exist inside the xml

Please post what solution you come up with. I'm trying to solve this exact same problem right now.

1 Like

Is it by any chance only displaying five items because the table visualization by default only displays five items?

What I'd like to see is a) the input document and b) the event(s) produced by Logstash. Copy/paste from Kibana's JSON tab is fine.

My bad, it is by default shows only 5 but when i change the value higher than 5 it still doesn't display the whole data.

a) I've sent the sample nessus file in your message box via dropbox. (if this is what you mean by input document.)
b) Event from the log of logstash is it? (Sorry i'm not so sure)

2018-02-23T07:52:00,695][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://elastic:xxxxxx@192.168.1.152:9200/]}}
[2018-02-23T07:52:00,696][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://elastic:xxxxxx@192.168.1.152:9200/, :path=>"/"}
[2018-02-23T07:52:00,967][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://elastic:xxxxxx@192.168.1.152:9200/"}
[2018-02-23T07:52:01,081][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>nil}
[2018-02-23T07:52:01,120][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>6}
[2018-02-23T07:52:01,138][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2018-02-23T07:52:01,302][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2018-02-23T07:52:01,536][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//192.168.1.152:9200"]}
[2018-02-23T07:52:21,211][INFO ][logstash.pipeline        ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0x7e4df78@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:246 run>"}
[2018-02-23T07:52:21,835][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}

I've sent the sample nessus file in your message box via dropbox. (if this is what you mean by input document.)

Can't you post it here or in a gist so that everyone can see it?

Event from the log of logstash is it? (Sorry i'm not so sure)

No, I guess the stdout output logs to a separate file in the log directory. I don't recall how recent Logstash versions work.

Ok here i include one of the other nessus scan sample on gist (https://gist.github.com/najmisyahir/7a1d2c17257ced61257e3033cd55d090)

I've able to change the log level from info to debug from logstash.yml config file, here is the log when the service started until the xml is stopped from parsing (https://gist.github.com/najmisyahir/955ac3d3d929290b0894085764518680)

I've able to change the log level from info to debug from logstash.yml config file, here is the log when the service started until the xml is stopped from parsing

The debug logs are not interesting. The output from a stdout { codec => rubydebug } output is.

Can you guide me on how to display those rubydebug output, I'm currently at lost on how to display it.

EDIT: I don't know if this is rubydebug (https://gist.github.com/najmisyahir/1717f67a41fa61c785affa43b3913cb5) I just ran this command on my console
sudo bin/logstash -f /etc/logstash/conf.d/test.conf from /usr/share/logstash/