Scale Logstash config for more than 20,000 messages per minute?

Hi
I'm using rsyslog as a syslog server to get the log from the Firewall and network devices, then it send log to Logstash (on the same host) to do the filter then push to Elastic (on other host).

If I forward around 10,000 events per minute it no problem, then I increase the number of log around 20,000 events per minute it process the log very slow...

My pipeline setting in logstash.yml:

pipeline.workers: 20
pipeline.batch.size: 200
pipeline.batch.delay: 5
queue.checkpoint.writes: 4096

My jvm.option:

-Xms24g
-Xmx28g
-Xss64M

My rsyslog.conf:

#### MODULES ####

# The imjournal module bellow is now used as a message source instead of imuxsock.
$ModLoad imuxsock # provides support for local system logging (e.g. via logger command)
$ModLoad imjournal # provides access to the systemd journal
#$ModLoad imklog # reads kernel messages (the same are read from journald)
#$ModLoad immark  # provides --MARK-- message capability
$IMUXSockRateLimitInterval 0 # turn off IMUXSock rate limit interval
$IMJournalRatelimitInterval 0 # turn off IMJournal
$imjournalRatelimitBurst 20000

# Set the maximum number of messages per second
$SystemLogRateLimitInterval 1
$SystemLogRateLimitBurst 20000

thanks a lot for help

should I seprate the filter to other Logstash host?
I'm doing the filter Paolo-Alto log with tag: Traffic, System, Threat, Global-Protect.
should I use Logstash01 for Traffic log => Elastic and Logstash02 for the rest of log type => Elastic?

I'm trying to looking for multiple model of Logstash but no luck ...

Thanks in advance

I try to change some setting as below, but it not help much....

pipeline.workers: 20
pipeline.batch.size: 1000
pipeline.batch.delay: 10

and tcp setting as below:

input {
  tcp {
    host => "127.0.0.1"
    port => 10514
    codec => "json"
    type => "rsyslog"
    tags => ["PAN-OS_syslog"]
    tcp_keep_alive => true
  }
}  

setting in the OS:

net.core.wmem_max=655360
net.core.rmem_max=26214400
net.ipv4.tcp_wmem = 4096  16384  655360
net.ipv4.tcp_rmem = 4096  87380  26214400

Do anyone had some idea for this issue?
thanks a lot

Hello,

How do you know that the bottleneck is Logstash and not Elasticsearch? Most of the time the bottleneck is on the destination, which is Elasticsearch in this case.

Did you made any troubleshoot to arrive to the conclusion that the issue is in Logstash?

A couple of things:

This seems too much, The recommendation is to use no more than 8 GB as the heap for Logstash. Did you have any OOM issues to increase to this?

First thing is to identify if the issue is in Logstash, as mentioned before, the bottleneck could be in the destination and no matter what is the specs of your Logstash machine, this won't change.

What is your output? Please share your output configuration.

Also, share your logstash configuration, your filters.

thank a lot for your respond,

I think the bottleneck in Logstash due to everytime I restart Logstash it run normal for a few minutes, then the process slow again...

here is my filter

input {
  tcp {
    host => "127.0.0.1"
    port => 10514
    codec => "json"
    type => "rsyslog"
    tags => ["PAN-OS_syslog"]
    tcp_keep_alive => true
  }
}                                                                            
		# ***NOTE on PRUNE***
		#  PA seems to add fields in pacthes and some fields do not apply to most environements
		#  So the prune command eliminates all undefined fields and unneeded to store only what your envionment uses
		#  If you want more fields than being stored, add them to the whitelist in the prune statement
		#  This also eliminates the columnX in the created document

filter {
    if "PAN-OS_syslog" in [tags] {

        # Log types are "TRAFFIC", "THREAT", "CONFIG", "SYSTEM" and "HIP-MATCH".
        # Traffic log fields: https://docs.paloaltonetworks.com/pan-os/10-0/pan-os-admin/monitoring/use-syslog-for-monitoring/syslog-field-descriptions/traffic-log-fields.html
        # Converted field names to CIM fields where possible, allows use of CIM Traffic visualitations and reports 

        if ([message] =~ /TRAFFIC/) {
            csv {
                source => "message"
                columns => [ 
                    "receive_time", "serial_number", "log_type", "content_type", "Config Version", "generated_time", "src_ip", "dest_ip", 
                    "src_translated_ip", "dest_translated_ip", "rule", "user", "dest_user", "app", "virtual_system", "src_zone", "dest_zone", 
                    "src_interface", "dest_interface", "log_profile", "Time Logged", "session_id", "repeat_count", "src_port", "dest_port", 
                    "src_translated_port", "dest_translated_port", "tcp_flag", "protocol", "action", "Bytes", "Bytes Sent", "Bytes Received", 
                    "packets", "start", "Elapsed Time", "category", "FUTURE_USE", "seqno", "action_flags", "src_location", "dest_location", 
                    "FUTURE_USE", "packets_out", "packets_in", "session_end_reason", "dg_hier_level_1", "dg_hier_level_2", "dg_hier_level_3", 
                    "dg_hier_level_4", "virtual_system_name", "dvc_name", "action_source", "srv_vm_uuid", "dest_vm_uuid", "tunnelid", "imei", 
                    "parent_session_id", "parent_start_time", "tunnel", "assoc_id", "chunks", "chunks_sent", "chunks_received", "rule_uuid", 
                    "http-2-connection", "link_change_count", "policy_id", "link_switches", "sdwan_cluster", "sdwan_device_type", 
                    "sdwan_cluster_type", "sdwan_site", "dynusergroup_name", "xff_address", "src_device_category", "src_device_profile", 
                    "src_device_model", "src_device_vendor", "src_device_os_family", "src_device_os_version", "src_hostname", "src_mac_address", 
                    "dest_device_category", "dest_device_profile", "dest_device_model", "dest_device_vendor", "dest_device_os_family", 
                    "dest_device_os_version", "dest_hostname", "dest_mac_address", "Container ID", "POD Namespace", "POD Name", 
                    "src_external_dynamic_list", "dest_external_dynamic_list", "host_id", "serial_number", "session_owner", 
                    "high_resolution_timestamp", "session_owner", "High Res Timestamp", "nssai_sst", "nssai_sd", "Subcategory of app", 
                    "Category of app", "Technology of app", "Risk of app", "Characteristic of app", "Container of app", "Tunneled app", 
                    "SaaS of app", "Sanctioned State of app", "offloaded", "flow_type", "cluster_name"
			]
            }
			# Most of the new fields in PANOS 10 are not enabled yet, so they are pruned, will add them to the whitelist as they become available
			# But note, default to is store the entire raw message so no data is lost as you can interogate the message field
			# Also, some fields like rule_uuid are pruned as rule name is avaiable.  If you need the UUID you can see it in the message field
            prune {
	                interpolate => true
                        whitelist_names => [ 
                            	"@timestamp","message","receive_time","serial_number","log_type","content_type","generated_time","src_ip","dest_ip","src_translated_ip",
				"dest_translated_ip","rule","user","dest_user","app","virtual_system","src_zone","dest_zone","src_interface","dest_interface",
				"log_profile","session_id","repeat_count","src_port","dest_port","src_translated_port","dest_translated_port","protocol",
				"action","Bytes", "Bytes Sent", "Bytes Received","packets","start","Elapsed Time","category","seqno","action_flags","src_location",
				"dest_location","packets_out","packets_in","session_end_reason","dvc_name","action_source","tunnelid","tunnel","Technology of app"
                        ]	
                    }							

            mutate {
                convert => [ "Bytes", "integer" ]
                convert => [ "Bytes Sent", "integer" ]
                convert => [ "Bytes Received", "integer" ]
                convert => [ "Elapsed Time", "integer" ]
                convert => [ "geoip.dma_code", "integer" ]
                convert => [ "geoip.latitude", "float" ]
                convert => [ "geoip.longitude", "float" ]
                convert => [ "dest_translated_port", "integer" ]
                convert => [ "src_translated_port", "integer" ]
                convert => [ "packets", "integer" ]
                convert => [ "packets_in", "integer" ]
                convert => [ "packets_out", "integer" ]
                convert => [ "seqno", "integer" ]
               
                add_tag => [ "PAN-OS_traffic"]
            }
                              
        }
 

        # Threat log fields: https://docs.paloaltonetworks.com/pan-os/10-0/pan-os-admin/monitoring/use-syslog-for-monitoring/syslog-field-descriptions/threat-log-fields.html
        # Converted field names to CIM fields where possible, allows use of CIM Traffic visualitations and reports 
        else if ([message] =~ /THREAT/) {
            csv {
                source => "message"
                columns => [ 
                  "receive_time", "serial_number", "log_type", "Threat/Content Type", "Config Version", "generated_time", "src_ip", "dest_ip", 
                  "src_translated_ip", "dest_translated_ip", "rule", "user", "dest_user", "app", "virtual_system", "src_zone", "dest_zone", 
                  "src_interface", "dest_interface", "log_profile", "Time Logged", "session_id", "repeat_count", "src_port", "dest_port", 
                  "src_translated_port", "dest_translated_port", "tcp_flag", "protocol", "action", "threat_uri_name", "threat_id", "category", 
                  "severity", "direction", "seqno", "action_flags", "src_location", "dest_location", "FUTURE_USE", "contenttype", "pcap_id", 
                  "filedigest", "cloud", "url_idx", "user_agent", "filetype", "xff", "referer", "sender", "subject", "recipient", "reportid", 
                  "dg_hier_level_1", "dg_hier_level_2", "dg_hier_level_3", "dg_hier_level_4", "virtual_system_name", "Device Name", "file_url", 
                  "srv_vm_uuid", "dest_vm_uuid", "http_method", "tunnelid", "imei", "parent_session_id", "parent_start_time", "tunnel", 
                  "thr_catagory", "contentver", "sig_flags", "assoc_id", "ppid", "http_headers", "url_catagory_list", "rule_uuid", 
                  "http-2-connection", "dynusergroup_name", "XFF address", "Source Device Category", "Source Device Profile", 
                  "Source Device Model", "Source Device Vendor", "Source Device OS Family", "Source Device OS Version", "Source Hostname", 
                  "Source Mac Address", "Destination Device Category", "Destination Device Profile", "Destination Device Model", 
                  "Destination Device Vendor", "Destination Device OS Family", "Destination Device OS Version", "Destination Hostname", 
                  "Destination Mac Address", "Container ID", "POD Namespace", "POD Name", "Source External Dynamic List", 
                  "Destination External Dynamic List", "Host ID", "Serial Number", "domain_edl", "Source Dynamic Address Group", 
                  "Destination Dynamic Address Group", "partial_hash", "High Res Timestamp", "Reason", "justification", "nssai_sst", 
                  "Subcategory of app", "Category of app", "Technology of app", "Risk of app", "Characteristic of app", "Container of app", 
                  "Tunneled app", "SaaS of app", "Sanctioned State of app", "Cloud Report ID", "cluster_name", "flow_type"
                ]
            }

             prune {
	                interpolate => true
                        whitelist_names => [ 
                  "@timestamp", "message", "receive_time", "serial_number", "log_type", "Threat/Content Type", "generated_time", "src_ip", 
                  "dest_ip", "src_translated_ip", "dest_translated_ip", "rule", "user", "dest_user", "app", "virtual_system", "src_zone", 
                  "dest_zone", "src_interface", "dest_interface", "log_profile", "session_id", "repeat_count", "src_port", "dest_port", 
                  "src_translated_port", "dest_translated_port", "tcp_flag", "protocol", "action", "threat_uri_name", "threat_id", "category", 
                  "severity", "direction", "seqno", "action_flags", "src_location", "dest_location", "contenttype", "pcap_id", "filedigest", 
                  "cloud", "url_idx", "user_agent", "filetype", "xff", "referer", "sender", "subject", "recipient", "reportid", "Source Hostname", 
                  "http_method", "tunnelid", "tunnel", "thr_catagory", "contentver", "assoc_id", "ppid", "http_headers", "url_catagory_list", 
                  "rule_uuid", "Device Name", "Risk of app"
                                    ]	
                    }							


            mutate {
                convert => [ "geoip.dma_code", "integer" ]
                convert => [ "geoip.latitude", "float" ]
                convert => [ "geoip.longitude", "float" ]
                convert => [ "dest_translated_port", "integer" ]
                convert => [ "src_translated_port", "integer" ]
                convert => [ "SequenceNumber", "integer" ]
   
                add_tag => ["PAN-OS_threat"]
            }
            
        }
	
	#filter GlobalProtect - VPN client
    	else if ([message] =~ /GLOBALPROTECT/) {
	    csv {
		source => "message"
		columns => [
                    "receive_time", "Serial Number", "Type", "Threat/Content Type", "Config Version", "Generate Time", "Virtual System", 
                    "Event ID", "Stage", "Authentication Method", "Tunnel Type", "Source User", "Source Region", "Machine Name", 
                    "Public IP", "Public IPv6", "Private IP", "Private IPv6", "Host ID", "Serial_Number", "Client Version", "Client OS", 
                    "Client OS Version", "Repeat Count", "Reason", "Error", "Description", "Status", "Location", "Login Duration", 
                    "Connect Method", "Error Code", "Portal", "Sequence Number", "Action Flags", "High Res Timestamp", "Selection Type", 
                    "Response Time", "Priority", "Attempted Gateways", "Gateway", "DG Hierarchy Level 1", "DG Hierarchy Level 2", 
                    "DG Hierarchy Level 3", "DG Hierarchy Level 4", "Virtual System Name", "Device Name", "Virtual System ID", 
                    "cluster_name"

		]
	    }
            prune {
	        interpolate => true
            	whitelist_names => [ 
                    "@timestamp","message","receive_time","Serial Number","Type","Threat/Content Type","Config Version","Generate Time",
                    "Virtual System","Event ID","Stage","Authentication Method","Tunnel Type","Source User","Source Region","Machine Name",
                    "Public IP","Private IP","Serial Number","Client Version","Client OS","Client OS Version","Repeat Count","Reason","Error",
                    "Status","Location","Login Duration","Connect Method","Error Code","Portal","High Res Timestamp"

            ]
        }

	    mutate {
		convert => [ "Serial Number", "integer" ]
                convert => [ "Login Duration", "integer" ]

                add_tag => ["PAN-OS_GLOBALPROTECT"]
            }
        }


    	# Config log fields:  https://docs.paloaltonetworks.com/pan-os/10-0/pan-os-admin/monitoring/use-syslog-for-monitoring/syslog-field-descriptions/config-log-fields.html
	else if ([message] =~ /CONFIG/) {
	    csv {
		source => "message"
		columns => [
			"FUTURE_USE", "receive_time", "serial_number", "Type", "Threat/Content Type", "Config Version", "generated_time", "src_ip", 
                    	"virtual_system", "Command", "Admin", "Client", "Result", "Configuration_Path", "Sequence Number", "before_change_detail", 
                    	"after_change_detail", "action_flags", "DG Hierarchy Level 1", "DG Hierarchy Level 2", "DG Hierarchy Level 3", 
                    	"Virtual System Name", "DG Hierarchy Level 4", "dvc_name", "dg_id", "comment"

		]
	    }
             prune {
	                interpolate => true
                        whitelist_names => [ 
                  		"@timestamp","message","receive_time","serial_number","Type","Threat/Content Type","generated_time","src_ip","virtual_system",
		                "Command","Admin","Client","Result","Configuration_Path","before_change_detail","after_change_detail","Sequence Number","action_flags","dvc_name"

                                    ]		
                    }							


	    mutate {
                add_tag => ["PAN-OS_Config"]
            }
        }

	else if ([message] =~ /SYSTEM/) {
	    csv {
		source => "message"
		columns => [
                    "receive_time", "Serial Number", "Type", "Threat/Content Type", "Config Version", "Generate Time", "Virtual System",
                    "Event ID", "Object", "fmt", "id", "module", "Severity", "Description", "Sequence Number", "Action Flags", 
                    "DG Hierarchy Level 1", "DG Hierarchy Level 2", "DG Hierarchy Level 3", "DG Hierarchy Level 4", "Virtual System Name", 
                    "Device Name", "dg_id", "tpl_id", "High Res Timestamp"

		]
	    }
             prune {
	                interpolate => true
                        whitelist_names => [ 
                  		"@timestamp", "message", "receive_time", "Serial Number", "Type", "Threat/Content Type", "Generated Time", "Virtual System", "Event ID",
                  		"Object", "module", "Severity", "Description", "Sequence Number", "Action Flags", "Device Name"
                    ]	
                    }	
              if ([Description] =~ /DHCP lease started/) {
               grok {
                   match => {"Description" => "%{IP:lease_ip} --> mac %{MAC:mac_address} - hostname %{WORD:src_name}" }
			       add_tag => [ "DHCP_LEASE"]			   
                }
			   }
					

	    mutate {
                add_tag => ["PAN-OS_System"]
            }
        }


  #  If your firewall is subject to any audits or PCI compliance, then you must leave the orginal RAW message in the logs 
  #  so leave this section commented out.  If not, you can uncomment this mutate section as you do not need the unparsed 
  #  raw log to be in each document.
  #
  #      mutate {
  #          # Original message has been fully parsed, so remove it.
  #          remove_field => ["message"]
  #      }

        # Geolocate logs that have src_ip and it is a non-RFC1918 address
        if [src_ip] and [src_ip] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
            geoip {
               source => "src_ip"
               target => "src_geoip"
          }

            # Delete 0,0 in src_geo.location if equal to 0,0
            if ([src_geoip.location] and [src_geoip.location] =~ "0,0") {
                mutate {
                    replace => [ "src_geoip.location", "" ]
                }
            }
        }

        # Geolocate logs that have dest_ip and if that dest_ip is a non-RFC1918 address
        if [dest_ip] and [dest_ip] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
            geoip {
                source => "dest_ip"
                target => "dest_geoip"
            }
      
            # Delete 0,0 in dest_geoip.location if equal to 0,0
            if ([dest_geoip.location] and [dest_geoip.location] =~ "0,0") {
                mutate {
                    replace => [ "dest_geoip.location", "" ]
                }
            }
        }

        # Takes the 5-tuple of source address, source port, destination address, destination port, and protocol and does a SHA1 hash to fingerprint the flow.  This is a useful
        # way to be able to do top N terms queries on flows, not just on one field.
        if [src_ip] and [dest_ip] {
            fingerprint {
                concatenate_sources => true
                method => "SHA1"
                key => "logstash"
                source => ["src_ip", "src_port", "dest_ip", "dest_port", "protocol" ]
            }
        }
        
        #Resolve IP address to names.  The DNS filter can be a performance issue is receing over 400 requstes per second.
        #Do not use Logstash DNS Filter Cache parameters, it forces a single thread process regaurdless of how many workers you have
        #For best performance, install dnsmasq on the logstash server
        #Make sure your dnsmasq points to an internal DNS to resolve local RFC1918 addresses
        if [src_ip] {
            mutate {
            add_field => { "src_name" => "%{src_ip}" }
            }   
	    dns {
                reverse => ["src_name"] 
                action => "replace"
                nameserver => [ "10.164.10.115" ]

            }     
       }        
       if [dest_ip] {
            mutate {
            add_field => { "dest_name" => "%{dest_ip}" }
            }   
	    dns {
                reverse => ["dest_name"] 
                action => "replace"
                nameserver => ["10.164.10.116"]

            }     
       }        

    }
}
                                                                                        
# Every single log will be forwarded to ElasticSearch. If you are using another port, you should specify it here.
output {
    if "PAN-OS_traffic" in [tags] {
        elasticsearch {
            index => "panos-traffic"
            hosts => ["10.164.254.70:9200", "10.164.254.71:9200"]
        }
    }
    else if "PAN-OS_threat" in [tags] {
        elasticsearch {
            index => "panos-threat"
            hosts => ["10.164.254.70:9200", "10.164.254.71:9200"]
       }
    }
    else if "PAN-OS_Config" in [tags] {
        elasticsearch {
            index => "panos-config"
            hosts => ["10.164.254.70:9200", "10.164.254.71:9200"]
        }
    }
    else if "PAN-OS_GLOBALPROTECT" in [tags] {
        elasticsearch {
            index => "panos-global-protect"
            hosts => ["10.164.254.70:9200", "10.164.254.71:9200"]
        }
    }	
    else if "PAN-OS_System" in [tags] {
        elasticsearch {
            index => "panos-system"
            hosts => ["10.164.254.70:9200", "10.164.254.71:9200"]
        }
    }
    else {
        elasticsearch {
            index => "panos-undefined"
            hosts => ["10.164.254.70:9200", "10.164.254.71:9200"]
        }
    }
}

thanks you

I have 3 Elastic node, one for master and 2 for data node, the output index is data node.

I do check the health of cluster, it always green

{
    "cluster_name": "elkcluster",
    "status": "green",
    "timed_out": false,
    "number_of_nodes": 3,
    "number_of_data_nodes": 3,
    "active_primary_shards": 17,
    "active_shards": 34,
    "relocating_shards": 0,
    "initializing_shards": 0,
    "unassigned_shards": 0,
    "delayed_unassigned_shards": 0,
    "number_of_pending_tasks": 0,
    "number_of_in_flight_fetch": 0,
    "task_max_waiting_in_queue_millis": 0,
    "active_shards_percent_as_number": 100
}

Yeah, but what are the specs? CPU, RAM, JVM HEAP, Disk type?

Don't think this is enough to say that the bottleneck is on Logstash.

But, what does your losgstash.yml looks like? Are you using Persistent Queue? Please share your entire logstash.yml file.

Maybe you will need to add a new Logstash node to balance the requests, scaling up does not always solve this kind of issue.

Also, why are you sending it to a rsyslog server on the same server just to redirect the logs to Logstash? You could send it directly to Logstash and avoid this extra tcp connection.

yes, you are right. I see many topics relate to Logstash performance and it need maximum around 8Gb of memory to run one Logstash instance. Current I running Logstash with queue memory, I try to ues persitance already but it the same issue: log process is slow like in this toptic: Time delay between logstash and elasticsearch - Elastic Stack / Logstash - Discuss the Elastic Stack

here is my logstash.yml

# Settings file in YAML
#
# Settings can be specified either in hierarchical form, e.g.:
#
#   pipeline:
#     batch:
#       size: 125
#       delay: 5
#
# Or as flat keys:
#
#   pipeline.batch.size: 125
#   pipeline.batch.delay: 5
#
# ------------  Node identity ------------
#
# Use a descriptive name for the node:
#
# node.name: test
#
# If omitted the node name will default to the machine's host name
#
# ------------ Data path ------------------
#
# Which directory should be used by logstash and its plugins
# for any persistent needs. Defaults to LOGSTASH_HOME/data
#
# path.data:
#
# ------------ Pipeline Settings --------------
#
# The ID of the pipeline.
#
# pipeline.id: main
#
# Set the number of workers that will, in parallel, execute the filters+outputs
# stage of the pipeline.
#
# This defaults to the number of the host's CPU cores.
#
pipeline.workers: 20
#
# How many events to retrieve from inputs before sending to filters+workers
#
pipeline.batch.size: 1000
#
# How long to wait in milliseconds while polling for the next event
# before dispatching an undersized batch to filters+outputs
#
pipeline.batch.delay: 10
#
# Force Logstash to exit during shutdown even if there are still inflight
# events in memory. By default, logstash will refuse to quit until all
# received events have been pushed to the outputs.
#
# WARNING: Enabling this can lead to data loss during shutdown
#
# pipeline.unsafe_shutdown: false
#
# Set the pipeline event ordering. Options are "auto" (the default), "true" or "false".
# "auto" automatically enables ordering if the 'pipeline.workers' setting
# is also set to '1', and disables otherwise.
# "true" enforces ordering on the pipeline and prevent logstash from starting
# if there are multiple workers.
# "false" disables any extra processing necessary for preserving ordering.
#
# pipeline.ordered: auto
#
# Sets the pipeline's default value for `ecs_compatibility`, a setting that is
# available to plugins that implement an ECS Compatibility mode for use with
# the Elastic Common Schema.
# Possible values are:
# - disabled
# - v1
# - v8 (default)
# Pipelines defined before Logstash 8 operated without ECS in mind. To ensure a
# migrated pipeline continues to operate as it did before your upgrade, opt-OUT
# of ECS for the individual pipeline in its `pipelines.yml` definition. Setting
# it here will set the default for _all_ pipelines, including new ones.
#
# pipeline.ecs_compatibility: v8
#
# ------------ Pipeline Configuration Settings --------------
#
# Where to fetch the pipeline configuration for the main pipeline
#
# path.config:
#
# Pipeline configuration string for the main pipeline
#
# config.string:
#
# At startup, test if the configuration is valid and exit (dry run)
#
# config.test_and_exit: false
#
# Periodically check if the configuration has changed and reload the pipeline
# This can also be triggered manually through the SIGHUP signal
#
# config.reload.automatic: false
#
# How often to check if the pipeline configuration has changed (in seconds)
# Note that the unit value (s) is required. Values without a qualifier (e.g. 60)
# are treated as nanoseconds.
# Setting the interval this way is not recommended and might change in later versions.
#
# config.reload.interval: 3s
#
# Show fully compiled configuration as debug log message
# NOTE: --log.level must be 'debug'
#
# config.debug: false
#
# When enabled, process escaped characters such as \n and \" in strings in the
# pipeline configuration files.
#
# config.support_escapes: false
#
# ------------ API Settings -------------
# Define settings related to the HTTP API here.
#
# The HTTP API is enabled by default. It can be disabled, but features that rely
# on it will not work as intended.
#
# api.enabled: true
#
# By default, the HTTP API is not secured and is therefore bound to only the
# host's loopback interface, ensuring that it is not accessible to the rest of
# the network.
# When secured with SSL and Basic Auth, the API is bound to _all_ interfaces
# unless configured otherwise.
#
# api.http.host: 127.0.0.1
#
# The HTTP API web server will listen on an available port from the given range.
# Values can be specified as a single port (e.g., `9600`), or an inclusive range
# of ports (e.g., `9600-9700`).
#
# api.http.port: 9600-9700
#
# The HTTP API includes a customizable "environment" value in its response,
# which can be configured here.
#
# api.environment: "production"
#
# The HTTP API can be secured with SSL (TLS). To do so, you will need to provide
# the path to a password-protected keystore in p12 or jks format, along with credentials.
#
# api.ssl.enabled: false
# api.ssl.keystore.path: /path/to/keystore.jks
# api.ssl.keystore.password: "y0uRp4$$w0rD"
#
# The availability of SSL/TLS protocols depends on the JVM version. Certain protocols are
# disabled by default and need to be enabled manually by changing `jdk.tls.disabledAlgorithms`
# in the $JDK_HOME/conf/security/java.security configuration file.
#
# api.ssl.supported_protocols: [TLSv1.2,TLSv1.3]
#
# The HTTP API can be configured to require authentication. Acceptable values are
#  - `none`:  no auth is required (default)
#  - `basic`: clients must authenticate with HTTP Basic auth, as configured
#             with `api.auth.basic.*` options below
# api.auth.type: none
#
# When configured with `api.auth.type` `basic`, you must provide the credentials
# that requests will be validated against. Usage of Environment or Keystore
# variable replacements is encouraged (such as the value `"${HTTP_PASS}"`, which
# resolves to the value stored in the keystore's `HTTP_PASS` variable if present
# or the same variable from the environment)
#
# api.auth.basic.username: "logstash-user"
# api.auth.basic.password: "s3cUreP4$$w0rD"
#
# When setting `api.auth.basic.password`, the password should meet
# the default password policy requirements.
# The default password policy requires non-empty minimum 8 char string that
# includes a digit, upper case letter and lower case letter.
# Policy mode sets Logstash to WARN or ERROR when HTTP authentication password doesn't
# meet the password policy requirements.
# The default is WARN. Setting to ERROR enforces stronger passwords (recommended).
#
# api.auth.basic.password_policy.mode: WARN
#
# ------------ Module Settings ---------------
# Define modules here.  Modules definitions must be defined as an array.
# The simple way to see this is to prepend each `name` with a `-`, and keep
# all associated variables under the `name` they are associated with, and
# above the next, like this:
#
# modules:
#   - name: MODULE_NAME
#     var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE
#     var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE
#
# Module variable names must be in the format of
#
# var.PLUGIN_TYPE.PLUGIN_NAME.KEY
#
# modules:
#
# ------------ Cloud Settings ---------------
# Define Elastic Cloud settings here.
# Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy
# and it may have an label prefix e.g. staging:dXMtZ...
# This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host'
# cloud.id: <identifier>
#
# Format of cloud.auth is: <user>:<pass>
# This is optional
# If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password'
# If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password'
# cloud.auth: elastic:<password>
#
# ------------ Queuing Settings --------------
#
# Internal queuing model, "memory" for legacy in-memory based queuing and
# "persisted" for disk-based acked queueing. Defaults is memory
#
# queue.type: memory
# queue.type: persisted
#
# If `queue.type: persisted`, the directory path where the pipeline data files will be stored.
# Each pipeline will group its PQ files in a subdirectory matching its `pipeline.id`.
# Default is path.data/queue.
#
# path.queue:
#
# If using queue.type: persisted, the page data files size. The queue data consists of
# append-only data files separated into pages. Default is 64mb
#
# queue.page_capacity: 2048mb
#
# If using queue.type: persisted, the maximum number of unread events in the queue.
# Default is 0 (unlimited)
#
# queue.max_events: 1000
#
# If using queue.type: persisted, the total capacity of the queue in number of bytes.
# If you would like more unacked events to be buffered in Logstash, you can increase the
# capacity using this setting. Please make sure your disk drive has capacity greater than
# the size specified here. If both max_bytes and max_events are specified, Logstash will pick
# whichever criteria is reached first
# Default is 1024mb or 1gb
#
# queue.max_bytes: 4096mb
#
# If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.acks: 4096
#
# If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
queue.checkpoint.writes: 4096
#
# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# Default is 1000, 0 for no periodic checkpoint.
#
# queue.checkpoint.interval: 4000
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
#
# dead_letter_queue.enable: false

# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
# dead_letter_queue.max_bytes: 1024mb

# If using dead_letter_queue.enable: true, the interval in milliseconds where if no further events eligible for the DLQ
# have been created, a dead letter queue file will be written. A low value here will mean that more, smaller, queue files
# may be written, while a larger value will introduce more latency between items being "written" to the dead letter queue, and
# being available to be read by the dead_letter_queue input when items are written infrequently.
# Default is 5000.
#
# dead_letter_queue.flush_interval: 5000

# If using dead_letter_queue.enable: true, controls which entries should be dropped to avoid exceeding the size limit.
# Set the value to `drop_newer` (default) to stop accepting new events that would push the DLQ size over the limit.
# Set the value to `drop_older` to remove queue pages containing the oldest events to make space for new ones.
#
# dead_letter_queue.storage_policy: drop_newer

# If using dead_letter_queue.enable: true, the interval that events have to be considered valid. After the interval has
# expired the events could be automatically deleted from the DLQ.
# The interval could be expressed in days, hours, minutes or seconds, using as postfix notation like 5d,
# to represent a five days interval.
# The available units are respectively d, h, m, s for day, hours, minutes and seconds.
# If not specified then the DLQ doesn't use any age policy for cleaning events.
#
# dead_letter_queue.retain.age: 1d

# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#
# path.dead_letter_queue:
#
# ------------ Debugging Settings --------------
#
# Options for log.level:
#   * fatal
#   * error
#   * warn
#   * info (default)
#   * debug
#   * trace
# log.level: info
#
# Options for log.format:
#   * plain (default)
#   * json
#
# log.format: plain
# path.logs:
#
# ------------ Other Settings --------------
#
# Allow or block running Logstash as superuser (default: true)
# allow_superuser: false
#
# Where to find custom plugins
# path.plugins: []
#
# Flag to output log lines of each pipeline in its separate log file. Each log filename contains the pipeline.name
# Default is false
# pipeline.separate_logs: false
#
# ------------ X-Pack Settings (not applicable for OSS build)--------------
#
# X-Pack Monitoring
# https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html
#xpack.monitoring.enabled: false
#xpack.monitoring.elasticsearch.username: logstash_system
#xpack.monitoring.elasticsearch.password: password
#xpack.monitoring.elasticsearch.proxy: ["http://proxy:port"]
#xpack.monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#xpack.monitoring.elasticsearch.cloud_id: monitoring_cluster_id:xxxxxxxxxx
#xpack.monitoring.elasticsearch.cloud_auth: logstash_system:password
# another authentication alternative is to use an Elasticsearch API key
#xpack.monitoring.elasticsearch.api_key: "id:api_key"
#xpack.monitoring.elasticsearch.ssl.certificate_authority: "/path/to/ca.crt"
#xpack.monitoring.elasticsearch.ssl.ca_trusted_fingerprint: xxxxxxxxxx
#xpack.monitoring.elasticsearch.ssl.truststore.path: path/to/file
#xpack.monitoring.elasticsearch.ssl.truststore.password: password
# use either keystore.path/keystore.password or certificate/key configurations
#xpack.monitoring.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.monitoring.elasticsearch.ssl.keystore.password: password
#xpack.monitoring.elasticsearch.ssl.certificate: /path/to/file
#xpack.monitoring.elasticsearch.ssl.key: /path/to/key
#xpack.monitoring.elasticsearch.ssl.verification_mode: full
#xpack.monitoring.elasticsearch.ssl.cipher_suites: []
#xpack.monitoring.elasticsearch.sniffing: false
#xpack.monitoring.collection.interval: 10s
#xpack.monitoring.collection.pipeline.details.enabled: true
#
# X-Pack Management
# https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html
#xpack.management.enabled: false
#xpack.management.pipeline.id: ["main", "apache_logs"]
#xpack.management.elasticsearch.username: logstash_admin_user
#xpack.management.elasticsearch.password: password
#xpack.management.elasticsearch.proxy: ["http://proxy:port"]
#xpack.management.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#xpack.management.elasticsearch.cloud_id: management_cluster_id:xxxxxxxxxx
#xpack.management.elasticsearch.cloud_auth: logstash_admin_user:password
# another authentication alternative is to use an Elasticsearch API key
#xpack.management.elasticsearch.api_key: "id:api_key"
#xpack.management.elasticsearch.ssl.ca_trusted_fingerprint: xxxxxxxxxx
#xpack.management.elasticsearch.ssl.certificate_authority: "/path/to/ca.crt"
#xpack.management.elasticsearch.ssl.truststore.path: /path/to/file
#xpack.management.elasticsearch.ssl.truststore.password: password
# use either keystore.path/keystore.password or certificate/key configurations
#xpack.management.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.management.elasticsearch.ssl.keystore.password: password
#xpack.management.elasticsearch.ssl.certificate: /path/to/file
#xpack.management.elasticsearch.ssl.key: /path/to/certificate_key_file
#xpack.management.elasticsearch.ssl.cipher_suites: []
#xpack.management.elasticsearch.ssl.verification_mode: full
#xpack.management.elasticsearch.sniffing: false
#xpack.management.logstash.poll_interval: 5s

# X-Pack GeoIP Database Management
# https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html#plugins-filters-geoip-manage_update
#xpack.geoip.downloader.enabled: true
#xpack.geoip.downloader.endpoint: "https://geoip.elastic.co/v1/database"

my VM config:
host Logstash: 24vCPU, 32Gb Memory, HDD 120Gb
host Elastic 1, 2, 3: 12vCPU, 32Gb Memory, HDD 700Gb => I set all node as data toghether. I can see java heap size when Elastic statup is 16Gb on each node.

Thanks a lot @leandrojmp
Trung

I will change this, I will stop rsyslog and use Logstash as a syslog and filter log to Elastic cluster, will inform you when done.

thanks a lot @leandrojmp

@leandrojmp I can see the connection between 3 elastic had high latency ~ 20->40 ms. Is this impact to elastic cluster performance?

I'm not sure, you didn't answer if you are using persistent queues or not in your logstash pipelines, can you share your pipelines.yml as well?

Also, you said this about your elasticsearch nodes:

HDD 700Gb

HDD can be pretty bad for elasticsearch performance, it is recommended to use at least some kind SSD.

To really know if the Elasticsearch is the bottleneck you would need to troubleshot it and also try to tune it for indexing speed.

There are a couple of topics already about it.

1 Like

@leandrojmp, sorry.... it was duplicate reply from my site that why I'm not sure what was provide.
I use persistent queue already, but it the same result. the log status is the same as in this topic: Time delay between logstash and elasticsearch

Elastic host running on SSD of course.

here is my logstash pipeline.yml

- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"
  pipeline.workers: 20

I will try to follow indexing speed as your recommendation.

Thanks a lot
Trung

20k/min = ~330/s

I would think Logstash should handle that no problem especially with your HW.

There is a bit of processing in that logstash pipeline but it's nothing too intensive as far as I can see.

Have you looked at threads in elasticsearch?

Have you tried to comment out elasticsearch and have logstash just write to a file and see if it keeps up?

1 Like

The geoip filter is documented as being "surprisingly expensive", and the dns filter documentation says "use of this plugin can significantly slow down your pipeline’s throughput if you have a high latency network. Caching can greatly improve things, but doing a never-seen-before dns lookup at a server on the other side of the planet can take hundreds of milliseconds.

I totally agree with @stephenb that you should replace the elasticsearch output and measure the throughput with a file output. You have to measure whether the issue is in logstash or elasticsearch.

I suggest you then try removing the geoip/dns filters and see what throughput you get without them. That should help you understand better where the bottleneck is.

If logstash is fast initially and then slows down that might be because either the geoip or dns filter caches fill up Both filters provide options to adjust cache sizes and the dns filter can cache negative results.

2 Likes

@leandrojmp @stephenb @Badger
yes, all your comment are corrected.
First: I disable the rsyslog service and push all log from firewall to logstash as udp plugin, running persistent queue as recommend.
Second: I rebuilt all Elastic cluster with 3 nodes minimum resource on CentOs7 (before on window 2022 :smiley: )
Now everything smoothly, now bottleneck anymore.
@leandrojmp will try with index speed soon.

thank you so much for your all recommendations.
Trung

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.