Correlation of IPs within ELK


I am trying to automate/ease a procedure to review firewall rules within ELK (ElasticSearch, Logstash, Kibana). I have some data obtained from a CSV, which is structured like this:

tcp:53;accept;No.10: ID: INC0000000001

My objective is to import this data within ELK by parsing each field (for subnet and/or IP address) and, if possible, add a sequential field (IP_Source1,IP_Destination2,etc) containing each one. This, in order to correlate each IP and (for example) see if it's contained within one of the subnets (aggregation of rules), and enrich the data with other inputs.

Is this possible, to your knowledge? How?

Thanks a lot for any input you might have.

Yes this can be done with a mix of the various filters Logstash has.

I'd start with the CSV filter to parse the events, and go from there.

Hello warkolm, thanks a lot for your reply. I was looking for a bit more detail, if possible.
I beautified the CSV and created the relative filter within logstash, in order to separate the source, destination, service, action and comment, but I have no idea on how to progress from there in order to parse the IPs within the source, destination fields and see how to correlate them.

Showing your config would be super handy :slight_smile:

Hello warkolm, below is my actual configuration for logstash:

input {
	file {
		path => "/home/alessandro/Desktop/Exports/Test/test.csv"
		start_position => "beginning"
		type => "test"

filter {
csv {
	separator => ";"
	columns => ["Source","Destination","Service","Action","Comment"]
output {
	elasticsearch { hosts => ["localhost:9200"] }
	stdout { codec => rubydebug }

The input has already been beautified wherever possible, and I also mocked up some ruby code (I was helped also in this because I'm not brilliant with such programming language). You may find the snippet below, but I don't know how to use it:

filter { 
    ruby { 
       code => "
	  import ipaddr
          arr = event.get('source').to_s.split(' ')
          arr.each.with_index(1) do |a, index|
          puts event.set(ip_source+index, a)

The script should scan the IP address contained within the source, and split it in different subsets of a list. then, it should be printed out.

Really appreciate your help.

Also, an idea is to use Kibana in order to accomplish my objective, but I'm not sure also how to use it. I'd appreciate any input also on this if you have it.

If you map the IP field correctly, then you can run subnet queries in Kibana :slight_smile:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.