Compare two datasets

Hello,

I have a list containing millions of malicious IP addresses, and I am using ELK to get and analyze data from multiple firewalls.

I received millions of logs per days from the firewall and my malicious IP address list is changing constantly.

I want to create a search or something else that compares the firewall logs and this malicious list of IP addresses.
If there is any match, the search should return the results, create an alarm, whatever. I just want to be aware of any match.

Can someone point me a direction ?
I have been exhausting my neuron on this task.

1 Like

Hi Jack,

An IP Address is made of 4 parts.
A.B.C.D

You need to anlayse this IP address in multi field search.

  1. The first analyser can be a stop analyser where you can separate IP address on basis of . (dot).
  2. The second analyser can be a standard analyser which a char filter which removes . out of IP address and makes the entire IP address as single string.

So when you run your analysis you can Multi search your new IP address with both analysed form of text we have. Any score of beyond 80 percent can be considered as malicious IP for you. The percentage can be tweaked as per use case.

Let me know if this works for you.

Actually, I am following another approach...
I was trying to compare two indices in elastic, and I was consuming a lot of time not getting anywhere.

I think the best way to compare data from the malicius hosts and the logs from the fws is to create a lookup/translate as soon as we get logs and enrich the incoming log with additional information.
Basically, my plan is to do a lookup/translate on the src/dst IP. If there is a match, I will tag the src/dst IP as malicious.

Here is the logstash filter I am preparing:

filter {
  translate {
    field => "source_ip"  //source IP from the original log. 
    destination => "malicious"  //new field created to tag the IP as malicious
    dictionary_path => '/opt/minemeld/ipv4.yaml'  //list of Malicious addresses in YAML format
    refresh_interval => '300'   //refresh interval for YAML file
  }
}

The YAML file should look like this:

“1.1.1.1”:”TRUE”
“2.2.2.2”:”TRUE”

I cant test today, but I believe it will work

Here is more info if for those who face the same issue

1 Like

That's the best way, that or use an Elasticsearch index with the data in it and then an Elasticsearch filter that does a lookup to that index.

1 Like

The translate is not working.

I am using packetbeat to send network activity.

Here is my conf:

input section

input {
beats {
port => 5044
}
}

filter {
translate {
field => "dest.ip"
destination => "malicious_IP"
dictionary_path => '/opt/logstash/maliciousIPV4.yaml'
override => true
}

translate {
field => "source.ip"
destination => "malicious_IP"
dictionary_path => '/opt/logstash/maliciousIPV4.yaml'
override => true
}

}
output {
elasticsearch {
hosts => localhost
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}

I can see the logs coming, they appear in Kibana, but the translation just doesnt work.

Here is the content of the dictionary

"216.46.173.126": "true"
"180.179.174.219": "true"
"204.77.168.241": "true"
"65.39.197.164": "true"
"80.91.33.133": "true"
"84.208.15.12": "true"
"74.125.60.158": "true"
"8.8.8.8": "true"
"200.221.2.45": "true"
"186.232.248.40": "true"

The translate plugin is installed.

Any idea?

I think this should be: field => "[source][ip]"

1 Like

it worked!.

Thanks a lot. I spent 4 hours yesterday trying to fix this lol.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.