Hi,
Here is my current filter to check malicious IP addresses in a MYSQL database:
It is working fine, but I think it needs some tuning...
filter {
if [dest_ip] and [src_ip] {
cidr {
add_tag => [ "internal_dest_ip" ]
address => [ "%{dest_ip}" ]
network => [ "169.254.0.0/16", "fe80::/64", "192.168.0.0/16", "172.16.0.0/12", "10.0.0.0/8" ]
}
if "internal_dest_ip" in [tags] {
cidr {
add_tag => [ "INTERNAL_TRAFFIC" ]
address => [ "%{src_ip}" ]
network => [ "169.254.0.0/16", "fe80::/64", "192.168.0.0/16", "172.16.0.0/12", "10.0.0.0/8" ]
}
mutate {
remove_tag => ["internal_dest_ip"]
}
}
if "INTERNAL_TRAFFIC" not in [tags] {
jdbc_streaming {
jdbc_driver_library => "/usr/share/logstash/bin/mysql-connector-java-5.1.42-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/db"
jdbc_user => "script"
jdbc_password => "blablablalbla"
statement => "select malicious from maliciousIP where ip= :ipdest or ip= :ipsrc"
parameters => { "ipdest" => "[dest_ip]" "ipsrc" => "[src_ip]"}
target => "malicious_IP"
enable_metric => "false"
use_cache => "true"
cache_expiration => "300"
cache_size => "10000"
}
mutate {
copy => { "[malicious_IP][0][malicious]" => "maliciousIP"}
remove_field => ["malicious_IP"]
}
}
}
}
First I check the dest_ip for private ip addresses and I tag it accordingly. Then I check the source, If the filter also finds private ip address (both source and dest) I tag it with "Internal Traffic". This is the strategy I implemented to avoid querying internal traffic.
Is there any other way to check both src_ip and dest_ip simultaneously ?
I tried to use address => [ "%{dest_ip}" , "%{src_ip}"] inside CIDR filter, but this applies OR logic, not AND; and I only want to tag the traffic when both src_ip and dest_ip are in the private address space.
My second question is regarding the jdbc streming plugin.
Is there any way to manipulate the query using some sort of conditional?
My solution is checking both source and destination at the same time in the query. The problem is that I am querying the same IP address whenever the source or destination changes. For example, I query
src 192.168.0.1 and dst 2.2.2.2, logstash then cache the query result for the time I configured within the cache_expiration parameter.
If another packet goes from 192.168.0.55 to 2.2.2.2, I will query 192.168.0.55 and 2.2.2.2 again.
I though in creating some sort of conditional inside the jdbc streaming. Something like this:
jdbc_streaming {
jdbc_driver_library => "/usr/share/logstash/bin/mysql-connector-java-5.1.42-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/db"
jdbc_user => "script"
jdbc_password => "blablablalbla"
if "INTERNAL_SOURCE" in [tags] {
statement => "select malicious from maliciousIP where ip= :ipdest"
parameters => { "ipdest" => "[dest_ip]"}
}elseif "EXTERNAL_SOURCE" in [tags] {
statement => "select malicious from maliciousIP where ip= :ipsrc"
parameters => { "ipsrc" => "[src_ip]"}
}
target => "malicious_IP"
enable_metric => "false"
use_cache => "true"
cache_expiration => "300"
cache_size => "10000"
}
Is there any way to implement conditional inside a filter ? Do you see any other way to do what I am trying to do here?