In what all ways I can optimize my Logstash filters?

Hi,

I am using ELK GA 5.0.0. I have a Logstash which consumes from 3 different Kafka topics. I have the filter like below;

filter {
	if[logtype] == "LOGFILE"{
	if[logtype] == "LOGFILE"{
		if[fields][logtype] == "logfile_a"{
			grok{
				match => { "message" => "####<(?<timestamp>%{MONTH} %{MONTHDAY}, 20%{YEAR} %{HOUR}:?%{MINUTE}(?::?%{SECOND}) (?:AM|PM) (?:GMT))\> <%{GREEDYDATA:field_a}> <%{GREEDYDATA:field_b}> <%{GREEDYDATA:skip1}> <%{GREEDYDATA:field_c}> <%{GREEDYDATA:skip2}> <%{GREEDYDATA:skip3}> <%{GREEDYDATA:skip4}> <%{GREEDYDATA:field_d}> <%{GREEDYDATA:skip5}> <%{GREEDYDATA:skip6}> <%{GREEDYDATA:field_e}>" }	
			}
			mutate{
				remove_field => ["skip1","skip2","skip3","skip4","skip5","skip6"]
			}
			date {
				match => [ "timestamp", "MMM dd, YYYY hh:mm:ss aa z" ]
				timezone => "GMT"
				target => "@timestamp"
			}
		}else if[fields][logtype] == "logfile_b"{
			grok{
				match => { "message" => "%{NOTSPACE:field_a} %{NOTSPACE:field_b} ('%{NOTSPACE:field_c}'|%{NOTSPACE:field_c}) %{NOTSPACE:field_d} %{NOTSPACE:field_e} %{NOTSPACE:field_f} %{NOTSPACE:field_g} %{NOTSPACE:field_h} %{NOTSPACE:field_i} %{NOTSPACE:field_j} ('%{NOTSPACE:field_k}'|%{NOTSPACE:field_k}) %{NOTSPACE:field_l} ('%{GREEDYDATA:field_m}'|%{NOTSPACE:field_m}) ('%{GREEDYDATA:field_n}'|%{NOTSPACE:field_n}) ('%{GREEDYDATA:field_o}'|%{NOTSPACE:field_o}) ('%{GREEDYDATA:field_p}'|%{NOTSPACE:field_p})" }
			}
			if [field_d] == "-" {
				mutate {
					replace => [ "field_d", "-1" ]
				}
			}
			if [field_f] == "-" {
				mutate {
					replace => [ "field_f", "-1" ]
				}
			}
			if [field_k] == "testdata_k" {
				mutate {
					add_tag => [ "unwanted_log"]
				}
			}
			mutate {
				add_field => {
					"logtime" => "%{field_a} %{field_b}"
				}
				remove_field => [ "field_a", "field_b" ]
				gsub => ["field_g","\'",""]
			}
			date {
				match => [ "logtime", "YYYY-MM-dd HH:mm:ss" ]
				timezone => "GMT"
				target => "@timestamp"
			}
		}
	}else if[logtype] == "logfile_c"{
		grok {
			match => { "message" => "^##(?<level>(levelA|levelB|levelC|levelD|levelE)) (?<timestamp>%{MONTHDAY}-%{MONTH}-%{YEAR} %{TIME}) %{NOTSPACE:field_a} %{NOTSPACE:field_b} %{GREEDYDATA:field_c}" }
		}
		date {
			match => [ "timestamp", "dd-MMM-yyyy HH:mm:ss.SSS" ]
			timezone => "UTC"
			target => "@timestamp"
		}
	}
}

The problem is, I gets a CPU spike even if I have an average of 50 events per second, and sometimes gets an error like excessively long grok patterns. It will be really helpful if someone can tell me why this is happening and how to fix this.

Suggestions:

  • You have two if[logtype] == "LOGFILE"{ after one another.
  • Don't use so many GREEDYDATA patterns. Use more exact patterns.
  • Instead of capturing fields into skip1 etc and deleting them afterwards, don't capture them in the first place (e.g. use %{GREEDYDATA} instead of %{GREEDYDATA:skip1}).

Hi, will try your suggestions. @magnusbaeck is there any reference page for logstash field datatypes?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.