Logstash CPU Problem

Hey guys,

I am ingesting firewall logs through logstash via tcp input and am dealing with an issue. Event logs (e.g. vpn) and UTM arrive normally, but when I enable traffic logs (which jumps from 20 eps to 3k eps) logstash automatically increases cpu consumption to 100% and from time to time it ends up losing the connection to the source and discards some logs. So far, all pipelines (3 in total) are running with memory queue and normally, but it is only after enabling the sending of traffic logs that the problems begin.

Today there is a machine dedicated to logstash (8vCPU, 600GB Disk, 32GB Memory). The logstash settings are the installation defaults (with the exception of the JVM, which I increased to 8GB. I have some questions:

If I enable persistent queue, will I be able to remedy this problem? If so, is there a way to enable it only for specific pipelines instead of all?

Will I need to use a service like Redis or Kafka?

Logstash CPU issues are in most cases related to filter configurations or an excess amount of errors in the output.

For example if you have a grok filter that has multiple patterns and it fails for the majority of them or if you are receiving a lot of 400 errors from your Elasticsearch endpoint, both things can lead to an increase in the CPU usage by Logstash.

Since you said that this happens when you enable traffic logs on your firewall, I'm assuming that you have an exclusive pipeline to receive your firewall logs, can you share your complete logstash configuration? Also, can you share your pipelines.yml file?

I doubt, persistent queues are used for resilience, it can sometimes also increase the CPU usage because it will need more I/O to write and read from disk.

Yes, you can configure PQ per pipeline.

Depends on the use case, there are some things that you can improve in Logstash before, but sometimes a Kafka cluster is needed, I would not use Redis in this case.

1 Like

Hi Leandro,

My pipeline:

input {
  tcp { 
    port => 5149
    codec => "json"
  }
}

filter {
  json {
    source => "message"
    target => "parsedjson"
  }

  geoip {
    source => "src"
    target => "srcgeo"
    fields => [country_name, country_code2]
  }  

  ruby {
    code => '
      require "ipaddr"
      require "json"
      subnets = JSON.parse(File.read("/etc/logstash/dicionarioorigem.json"))

      ip = IPAddr.new(event.get("src"))

      subnets.each do |subnet, data|
        if IPAddr.new(subnet).include?(ip)
          event.set("matriz_origem", data["matriz_origem"])
          event.set("filial_origem", data["filial_origem"])
          event.set("pa_origem", data["pa_origem"])
          event.set("[srcgeo][geo][country_iso_code]", data["country_iso_code"])
          event.set("[srcgeo][geo][country_name]", data["country_name"])
          break
        end
      end
     '
  }

  geoip {
    source => "dst"
    target => "dstgeo"
    fields => [country_name, country_code2]
  }

  ruby {
    code => '
      require "ipaddr"
      require "json"
      subnets = JSON.parse(File.read("/etc/logstash/dicionariodestino.json"))

      ip = IPAddr.new(event.get("dst"))

      subnets.each do |subnet, data|
        if IPAddr.new(subnet).include?(ip)
          event.set("matriz_destino", data["matriz_destino"])
          event.set("filial_destino", data["filial_destino"])
          event.set("pa_destino", data["pa_destino"])
          event.set("[dstgeo][geo][country_iso_code]", data["country_iso_code"])
          event.set("[dstgeo][geo][country_name]", data["country_name"])
          break
        end
      end
     '
  }

  mutate {
    remove_field => ["name", "version", "type", "isoTimeFormat"]
  }
}

output {
  elasticsearch {
    hosts => ["https://192.168.xxx.xxx:9200"]
    index => "firewall-traffic"
    user => "user"
    password => "password"    
  }
}

I enrich the IP data with Ruby, helping to identify the organization's origin/destination based on subnets (around 3,000), in addition to enriching it with GeoIP data (internal IPs are also being manipulated in Ruby)

I would say that the ruby filters can be the probably cause of your high CPU usage.

Can you share a sample message that you are receiving in Logstash and a sample message of your dictionary file?

Also, any reason to enrich the data using a custom ruby filter instead the native translate filter?

You can redact any sensitive information, but if possible share som reproducible message.

Logstash message:

{
  "filial_origem" (added with Ruby): "filial1",
  "Destination Interface": "LAN",
  "Service Name": "HTTPS",
  "src": "192.168.xxx.xxx",
  "Action": "close",
  "pa_origem" (added with Ruby): "123",
  "srcPort": "56271",
  "dst": "10.210.xxx.xxx",
  "dstPort": "443",
  "matriz_origem" (added with Ruby): "matriz1",
  "@timestamp": "2023-11-09T21:42:34.153524086",
  "dstgeo": 0,
  "Type": "traffic",
  "Subtype": "forward",
  "DevType": "Router",
  "srcgeo": {
    "geo": {
      "country_name": "Brazil",
      "country_iso_code": "BR"
    }
  }
}

Ruby dictionary:


{
  "10.1.0.0/24": {
    "matriz_origem": "matriz2",
    "filial_origem": "filial2",
    “pa_origem”: “343”
    "srcgeo.geo.country_iso_code": "BR",
    "srcgeo.geo.country_name": "Brazil"
  },
  "192.168.0.0/24": {
    "matriz_origem": "matriz1",
    "filial_origem": "filial1",
    “pa_origem”: “123”
    "srcgeo.geo.country_iso_code": "BR",
    "srcgeo.geo.country_name": "Brazil"
  },
  … +3.000
}

I used Ruby because it seemed to me a simpler option to work with an extensive dictionary (more than 3,000 subnets) and multiple enrichments without needing several filters, but I'm open to suggestions.

You are parsing the json dictionaries once for each event, that is going to be very expensive. Parse them in the init option, not the code option!

If you need to detect changes in the dictionary and reload then definitely migrate to a translate filter.

So I can achieve the same goal using the translate filter? What would you recommend to me based on the data?

I need this enrichment at each event to assist in the forensic process of cybersecurity analysts acting in response to incidents

You cannot do it directly with a translate filter. I would start by moving the JSON parsing to the init option of the ruby filter (making sure the scope of the parsed data is right).

You are going to have to do a lot of .include? calls in any case. That is going to be expensive. If you can build a custom MMDB database then you do it in a geoip filter, which will do caching and check recently seen subnets first. This is a major optimization because there are often many entries for the same IP in a short section of log.

If you can use the custom DB to tag a subnet id to the event then you could use a translate filter to enrich additional fields based on that id.

Thanks for the suggestions. I have some doubts: In the case of Ruby, I preferred to use subnets because creating a dictionary with all the IPs present would be very costly.

In the case of the Ruby filter, it takes the value of the src or dst (ex: 192.168.0.12) and enriches it with the pertinent information if this IP is in the corresponding subnet (ex: 192.168.0.0/24). By setting up a customized mmdb base, can geoip work with this same logic?

Regarding the option of a customized database, which one would you recommend? I've already thought about using memcached, but as I don't have the expertise, I didn't proceed.

These questions are more to think about multiple alternatives to be taken, because I've been having these difficulties for a while

When I referred to a custom DB I was talking about the custom MMDB database that I suggested.

Are all your subnets /24 ? If not, how are you dealing with overlapping IP address?

If your subnets are all /24, you can do this enrich using a translate filter with a dictionary like this:

'^10\.1\.0\.[0-9]{1,3}$': '{ "matriz_origem": "matriz2", "filial_origem": "filial2", "pa_origem": "343", "src_country_iso_code": "BR", "src_country_name": "Brazil"}'

Then the following filters will enrich your data, parse the json from the enrich dictionary and rename the two src_countr_y* fields.

filter {
    translate {
    	source => "ipField"
    	target => "[@metadata][translateData]"
    	dictionary_path => "/opt/logstash/pipelines/ips.yml"
    	refresh_interval => 300
        regex => true
        exact => true
     }
     json {
        source => "[@metadata][translateData]"
     }
     mutate {
        rename => {
            "src_country_iso_code" => "[srcgeo][geo][country_iso_code]"
            "src_country_name" => "[srcgeo][geo][country_name]"
        }
     }
}

But I would prefer to use a dictionary like this:

'^10\.1\.0\.[0-9]{1,3}$': "matriz2,filial2,343,BR,Brazil"

And parse it with a csv filter:

filter {
    translate {
    	source => "ipField"
    	target => "[@metadata][translateData]"
    	dictionary_path => "/opt/logstash/pipelines/ips.yml"
    	refresh_interval => 300
        regex => true
        exact => true
     }
    csv {
        source => "[@metadata][translateData]"
        separator => ","
        columns => ["[matriz_origem]","[filial_origem]","[pa_origem]","[srcgeo][geo][country_iso_code]","[srcgeo][geo][country_name]"]
    }
}

Yes, all subnets are /24. I will perform the tests in the environment with all the suggestions and give feedback later. Thank you all in advance!

Your suggestion works like a charm, thank you very much!

Hi,

This may or may not be relevant to your situation but might be helpful to know when it comes to processing larger numbers of events.

In Logstash v8.6.2+ one of the underlying libraries had a change to its default configuration which may not have been a problem to everyone but did mean a lot of confusion when it came to larger volumes not being processed as they had been previously.

The default value in v8.6.2+ is confirmed as -Dio.netty.allocator.maxOrder=6 ( which is 512K per thread )

Earlier versions of logstash used a different value which meant the higher number of events would be processed more efficiently.

-Dio.netty.allocator.maxOrder=11 (is basically 16Mb per thread, which is an older library default.)

... apply the setting in jvm.options file if you want the higher throughput behaviour again :slight_smile:

Some more detail on the Netty default change that occurred in 8.6.2:
https://netty.io/news/2022/03/10/4-1-75-Final.html

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.