Filter IP addresses by subnet

Hi all,

I need to enrich my firewall logs with the subnet address, subnet name and subnet location. I have a csv file with all these informations. Here is a similar case : https://discuss.elastic.co/t/cidr-subnet-relating/76757

Unfortunately there is no plugin able to achieve this. I tried with the CIDR filter but I realized it only returns a boolean when the ip matches a subnet. In my case I need to get the subnet that matched to enrich the log.
Btw this is a requested feature for the CIDR plugin that is still not implemented (https://github.com/logstash-plugins/logstash-filter-cidr/issues/16).

To solve this problem I wrote this:

ruby
{

code => "
	require 'csv'
	
	network_address = Array.new()
	network_location = Array.new()
	network_name = Array.new()
	
    ### I read the csv file and divide it by column 
	CSV.foreach('subnet_list.csv' , col_sep: ';', row_sep: :auto, headers: true) do |row|
	  network_address.push(IPAddr.new(row[0]))
	  network_location.push(row[1])
	  network_name.push(row[2])
	end
	### I take src_ip and dst_ip that i parsed in my grok filter
	ip_src = Array.new
	ip_src.push(IPAddr.new(event.get('[src_ip]')))
	
	ip_dst = Array.new
	ip_dst.push(IPAddr.new(event.get('[dst_ip]')))
	
	i = 0
    ### I multiply the network subnets array with the source and destination address (similar to what CIDR filter does.
	network_address.product(ip_src,ip_dst).each do |n, src, dst|
	  if n.include?(src)
	    event.set('[src_network_address]', n.to_s)
	    event.set('[src_network_location]', network_location[i])
	    event.set('[src_network_name]', network_name[i])
	  elsif n.include?(dst)
	    event.set('[dst_network_address]', n.to_s)
	    event.set('[dst_network_location]', network_location[i])
	    event.set('[dst_network_name]', network_name[i])
	  end
	  i += 1
	end	
"
}

This code works and meets my needs. However it seems to be substantially inefficient (my indexing rate is severely impacted). It may be due to the repeated calls to the file even if it's only 300 rows long.

I also thought about storing my data in an index instead of a csv file but it seems like a bad idea (the network latency could make things worse).

Is there a better way to do this ? Any idea is welcome.

Thanks in advance

Move the code that reads the file from code => '' to init => '' and stash those arrays in class variables?

1 Like

Thanks for your answer. It seems like a good solution. I'll dive into that and tell you how it works.

Thanks again. I implemented your idea and i have this code now :

ruby
{
	init => 
	"
		require 'csv'
		
		@@network_address = Array.new()
		@@network_location = Array.new()
		@@network_name = Array.new()
		
		CSV.foreach('/home/ndraris/logstash-ndraris-lab/Liste_sousréseaux.csv' , col_sep: ';', row_sep: :auto, headers: true) do |row|
		  @@network_address.push(IPAddr.new(row[0]))
		  @@network_location.push(row[1])
		  @@network_name.push(row[2])
		end
	"
	code => 
	"
		
		ip_src = Array.new
		ip_src.push(IPAddr.new(event.get('[src_ip]')))
		
		ip_dst = Array.new
		ip_dst.push(IPAddr.new(event.get('[dst_ip]')))
		
		i = 0
		found = 0
		@@network_address.product(ip_src,ip_dst).each do |n, src, dst|
		  if found > 1 
			return
		  end
		  if n.include?(src)
		    event.set('[src_network_address]', n.to_s)
		    event.set('[src_network_location]', @@network_location[i])
		    event.set('[src_network_name]', @@network_name[i])
		    found += 1
		  elsif n.include?(dst)
		    event.set('[dst_network_address]', n.to_s)
		    event.set('[dst_network_location]', @@network_location[i])
		    event.set('[dst_network_name]', @@network_name[i])
		    found += 1
		  end
		  i += 1
		end	
	"
}

I also added a "found" variable that will stop the search when both src_ip and dst_ip have already matched.

It seemed faster when I ran it on my lab. I will try it in production and see how it goes.

@@variable is global, you should not need that. @variable should work.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.