Split ES event

Hello,

at the moment I have one event. Console brings the following output:

{
   port => [
      [0] "22",
      [1] "443",
      [2] "623",
      [3]  "0",
      [4] "22",
      [5] "80"
   ],
   "path" =>"/home/data/test.xml"
   "timestamp" => ......
   "host-ip" => [
      [0] "1.111.11.111"
      [1] "1.111.11.112"
      [2] "1.111.11.114"
    ],
}

My aim is to get multiple events with host-ip,path,timestamp and port. For every port i wanna have one ES event. It should looks like this:

{
  {
    "port" => "22",
    "host-ip" => "1.111.11.111",
    "timestamp" => ".....",
    "path" => "/home/data/test.xml"
  },
  {
    "port" => "443",
    "host-ip" => "1.111.11.111",
    "timestamp" => ".....",
    "path" => "/home/data/test.xml"
  },
  {
    "port" => "623",
    "host-ip" => "1.111.11.111",
    "timestamp" => ".....",
    "path" => "/home/data/test.xml"
  },
  {
    "port" => "0",
    "host-ip" => "1.111.11.112",
    "timestamp" => ".....",
    "path" => "/home/data/test.xml"
  },
  {
    "port" => "22",
    "host-ip" => "1.111.11.112",
    "timestamp" => ".....",
    "path" => "/home/data/test.xml"
  },
  {
    "port" => "80",
    "host-ip" => "1.111.11.114",
    "timestamp" => ".....",
    "path" => "/home/data/test.xml"
  }
}

This is my filter

filter {
  xml {
    target => "doc"
    store_xml => "false"
    source => "message"
    xpath => [ 
        "//Host/tag[@name='host-ip']/text()", "host-ip",
        "//ReportItem/@port","Port"
    ]
  } 
  mutate {
    remove_field => ["message", "host", "tags"]         
  }
}

Can someone help me to achvie my goal? It would be nice if i can do this with a script, because I want to index this with more than one file and number of entries in the host-ip and port field can be different every time.

I read that i need to use ruby filter, but i never used ruby and i have absolutley no idea how to achieve this.

First of all: Thank you for giving examples for the desired data structure. I'm not a big fan of people who say something like "I want to process these events..." but never really say what they would like the output to look like (or even worse: They don't even provide the input).

Could you please explain further which IP should be combined with which port? I can't seem to grasp the logic behind your combinations.

If you wanted to create all possible combinations you could use two split filters. But based on your example I guess that's not the goal. So you'll probably have to loop through the port field in Ruby, do a new_event = event.clone, manipulate the ip and port field of the clone, call new_event_block.call(new_event) and cancel the original event.

I got some XML logfiles from vulnerability scans which i would like to index. I would like to combine every single port with a single ip. The problem is that the logs are structured so that i have multiple ports per ip. For example the first 5 ports are part of ip[0], the next 10 ports are part of ip[1], the last x ports are part of ip[2]. For the moment i can't see any strucutures because it can be every time different number of Ports which are scanned.

Maybe I built my config bad so I'm not able to achieve my goal.

Yes, I think the structure before the split has already lost some basic information, so we can not assign the right IPs and ports to each other. We need to have data like

"hosts" => {
   ["ip" => ".."
  "port" => ".."],
  ["ip" => ".."
  "port" => ".."]
}

or at least arrays of the same size for host and port, so entries with the same index belong to each other.

I don't have much experience with xpath, but, if you post your original XML, maybe someone can help.

This is the structure of my XML file

<Report>
<ReportHost name="1.111.11.111"><HostProperties>
<tag name="host-ip">11.111.11.111</tag></HostProperties>
<ReportItem port="0" svc_name="general" protocol="tcp" severity="0">
<description>teeext</description>
<fname>teeext</fname>
</ReportItem>
<ReportItem port="80" svc_name="general" protocol="tcp" severity="5">
<description>teeext</description>
<fname>teeext</fname>
</ReportItem>
</ReportHost>
<ReportHost name="11.111.11.112"><HostProperties>
<tag name="host-ip">11.111.11.112</tag></HostProperties>
<ReportItem port="23" svc_name="general" protocol="tcp" severity="3">
<description>teeext</description>
<fname>teeext</fname>
</ReportItem>
</ReportHost>
<ReportHost name="1.111.11.114"><HostProperties>
<tag name="host-ip">1.111.11.114</tag></HostProperties>
<ReportItem port="22" svc_name="general" protocol="tcp" severity="8">
<description>teeext</description>
<fname>teeext</fname>
</ReportItem>
</ReportHost>
</Report>

It's a file with 200.000+ lines with more XML elements than i show in this sample.
What I achieved is that i indexed every single ReportItem with all the attributes and elements in single events but I want the host-ip in this events too, but I don't know how to access it. There are always different numbers of ReportItem elements in the ReportHost elements...

I used file input with multiline and xml XPath filter to achieve this but this doesn't work for the ip-host field.

After a quick look at an online xpath tester:

Maybe something like that can help:

//ReportHost/ReportItem/concat(..//tag[@name='host-ip']/text(), ' : ', @port)

String='11.111.11.111 : 0'
String='11.111.11.111 : 80'
String='11.111.11.112 : 23'
String='1.111.11.114 : 22'

You could split based on that and separate IP and port again with grok afterwards? I really have nearly zero experience with xpath. But that result looked like it could be a step in the right direction.

I think //ReportHost/ReportItem/concat(..//tag[@name='host-ip']/text(),'') could give you an array of IPs that has the same size as your port array, but then we'd need a Ruby filter to combine the data, I guess.

String='11.111.11.111'
String='11.111.11.111'
String='11.111.11.112'
String='1.111.11.114'

I am able to get following output:

{
    "@timestamp" => 2018-10-26T11:43:50.747Z,
      "@version" => "1",
          "type" => "test",
          "path" => "/home/data/test.xml",
       "message" => "<ReportHost name=\"1.111.11.111\"><HostProperties>\n<tag name=\"host-ip\">1.111.11.111</tag></HostProperties>\n<ReportItem port=\"0\" svc_name=\"general\" protocol=\"tcp\" severity=\"0\">\n<description>teeext</description>\n<fname>teeext</fname>\n</ReportItem>\n<ReportItem port=\"0\" svc_name=\"general\" protocol=\"tcp\" "severity=\"0\">\n<description>teeext</description>\n<fname>teeext</fname>\n</ReportItem>\n<ReportItem port=\"80\" svc_name=\"general\" protocol=\"tcp\" severity=\"3\"<description>teeext</description>\n<fname>teeext</fname>\n</ReportItem>\n</ReportHost>"
          "host" => "srv-elastic",
          "tags" => [
        [0] "multiline"
    ],
       "@version" => "1"
}
{
  "@timestamp" => 2018-10-26T11:43:50.747Z,
      "@version" => "1",
          "type" => "test",
          "path" => "/home/data/test.xml",
       "message" => "<ReportHost name=\"1.111.11.112\"><HostProperties>\n<tag name=\"host-ip\">1.111.11.112</tag></HostProperties>\n<ReportItem port=\"23\" svc_name=\"general\" protocol=\"tcp\" severity=\"3\">\n<description>teeext</description>\n<fname>teeext</fname>\n</ReportItem>\n</ReportHost>"
          "host" => "srv-elastic",
          "tags" => [
        [0] "multiline"
    ],
       "@version" => "1"
}

I will test around with your input now. Thanks a lot right now.

For the sake of completeness: If you need multiple attributes from the parent item and the reports, it might be easier to just split your XML in multiple steps instead of trying to access the data directly.

filter {
  xml {
    store_xml => false
    source => "message"
    xpath => [
        "//ReportHost", "hostdata"
    ]
  }
  split {
    field => "hostdata"
  }
  xml {
    store_xml => false
    source => "hostdata"
    xpath => [
        "//ReportItem", "reportitem",
        "//tag[@name='host-ip']/text()", "ip"
    ]
  }
  split {
    field => "reportitem"
  }
  xml {
    target => "reportdata"
    source => "reportitem"
  }
  mutate {
    remove_field => ["message", "hostdata", "reportitem"]
    rename => {"[ip][0]" => "[reportdata][ip]"}
  }
}

{
            "ip" => [],
          "host" => "elasticsearch-vm",
      "@version" => "1",
    "reportdata" => {
           "svc_name" => "general",
                 "ip" => "1.111.11.114",
        "description" => [
            [0] "teeext"
        ],
               "port" => "22",
           "protocol" => "tcp",
           "severity" => "8",
              "fname" => [
            [0] "teeext"
        ]
    },
    "@timestamp" => 2018-10-26T13:01:59.450Z
}


(Alternative with less XML parsing:)

filter {
  xml {
    target => doc
    source => "message"
  }
  split {
    field => "[doc][ReportHost]"
  }
  split {
    field => "[doc][ReportHost][ReportItem]"
  }
  mutate {
    remove_field => ["message"]
  }
}

{
          "host" => "elasticsearch-vm",
      "@version" => "1",
    "@timestamp" => 2018-10-26T13:19:48.436Z,
           "doc" => {
        "ReportHost" => {
                "ReportItem" => {
                      "fname" => [
                    [0] "teeext"
                ],
                   "severity" => "8",
                "description" => [
                    [0] "teeext"
                ],
                       "port" => "22",
                   "protocol" => "tcp",
                   "svc_name" => "general"
            },
                      "name" => "1.111.11.114",
            "HostProperties" => [
                [0] {
                    "tag" => [
                        [0] {
                               "name" => "host-ip",
                            "content" => "1.111.11.114"
                        }
                    ]
                }
            ]
        }
    }
}

(In both versions you'll have some cleanup to do to get a nice finished structure :wink:)

1 Like

The idea is good, but i get errors for this line.

I'll try to do it in multiple steps. I am working with ELK for nearly 2 months now and all the topics I had to do, work with accessing data directly. Thanks for your advice, I'll try it out.

Maybe the library that is used by Logstash does not support xpath 2.0 and therefore doesn't know concat() :disappointed_relieved:

One question, what is your input for your alternative (with less XML parsing)? I get _xmlparsefailure and _split_type_failure when i try this filter or do i have to change smth there?

Does your inputfile have the same structure as my sample? I don't have one element per line there.

You've already helped me a lot, thanks for that.

I had this test config:

input {
  stdin {}
}
filter {
  xml {
    target => doc
    source => "message"
  }
  split {
    field => "[doc][ReportHost]"
  }
  split {
    field => "[doc][ReportHost][ReportItem]"
  }
  mutate {
    remove_field => ["message"]
  }
}
output {
  stdout { codec => rubydebug }
}

and inserted this message via stdin:
<Report><ReportHost name="1.111.11.111"><HostProperties><tag name="host-ip">11.111.11.111</tag></HostProperties><ReportItem port="0" svc_name="general" protocol="tcp" severity="0"><description>teeext</description><fname>teeext</fname></ReportItem><ReportItem port="80" svc_name="general" protocol="tcp" severity="5"><description>teeext</description><fname>teeext</fname></ReportItem></ReportHost><ReportHost name="11.111.11.112"><HostProperties><tag name="host-ip">11.111.11.112</tag></HostProperties><ReportItem port="23" svc_name="general" protocol="tcp" severity="3"><description>teeext</description><fname>teeext</fname></ReportItem></ReportHost><ReportHost name="1.111.11.114"><HostProperties><tag name="host-ip">1.111.11.114</tag></HostProperties><ReportItem port="22" svc_name="general" protocol="tcp" severity="8"><description>teeext</description><fname>teeext</fname></ReportItem></ReportHost></Report>

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.