Help with a logstash instance processing 4000 messages/s


(Laverio) #1

Hi, I'm suffering a serious tuning issue processing messages from a kafka queue.

The architecture is the following:

ASA Firewall -> logstash input -> kafka queue (32 partitions) -> logstash output (4 workers) -> elastic search

On a 4 vCpu 32gb vmware virtual machine (CentOS 7 OS).

The firewall produces an average flow of 3800 mess/s that have to be stored, read, PARSED and loaded into ES, and the performance issue is obiviously related to parsing with 14 matches to try for each message.

I've tried with anchors, splitting and any other performance tuning but the average output is "only" 4200 messages/s and the logstash process is using about 270% cpu instead of a full 400% (even on a 8 cores testing vm ).

I have to reach an average output of 6000 messages/s to empty the kafka queue in case of emergency.

Any suggestions?


(Nick George) #2

Hi Laverio,

What's your iowait sitting at?


(Laverio) #3

Unfortunately there's no I/O waiting, but java sits on its 400% even with 8 worker threads.

top - 12:17:42 up 2:48, 3 users, load average: 3,62, 3,54, 3,62
Tasks: 160 total, 1 running, 159 sleeping, 0 stopped, 0 zombie
%Cpu0 : 47,5 us, 2,0 sy, 0,0 ni, 48,8 id, 0,0 wa, 0,0 hi, 1,7 si, 0,0 st
%Cpu1 : 41,2 us, 1,4 sy, 0,0 ni, 56,8 id, 0,0 wa, 0,0 hi, 0,7 si, 0,0 st
%Cpu2 : 35,2 us, 1,4 sy, 0,0 ni, 63,5 id, 0,0 wa, 0,0 hi, 0,0 si, 0,0 st
%Cpu3 : 33,8 us, 1,0 sy, 0,0 ni, 65,2 id, 0,0 wa, 0,0 hi, 0,0 si, 0,0 st
%Cpu4 : 47,0 us, 1,7 sy, 0,0 ni, 51,0 id, 0,0 wa, 0,0 hi, 0,3 si, 0,0 st
%Cpu5 : 44,4 us, 1,0 sy, 0,0 ni, 54,6 id, 0,0 wa, 0,0 hi, 0,0 si, 0,0 st
%Cpu6 : 36,3 us, 0,7 sy, 0,0 ni, 63,1 id, 0,0 wa, 0,0 hi, 0,0 si, 0,0 st
%Cpu7 : 41,5 us, 1,3 sy, 0,0 ni, 57,2 id, 0,0 wa, 0,0 hi, 0,0 si, 0,0 st
KiB Mem: 32947724 total, 3021352 used, 29926372 free, 62440 buffers
KiB Swap: 0 total, 0 used, 0 free. 2099136 cached Mem

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
2764 root 20 0 6826496 628376 19108 S 342,2 1,9 324:46.39 java
9 root 20 0 0 0 0 S 0,3 0,0 0:00.27 rcuos/0
2494 root 20 0 25128 2944 2392 R 0,3 0,0 0:09.68 top
2889 root 20 0 24988 3100 2552 S 0,3 0,0 0:00.82 top
1 root 20 0 33608 4088 2628 S 0,0 0,0 0:01.35 init
2 root 20 0 0 0 0 S 0,0 0,0 0:00.00 kthreadd


(Christian Dahlqvist) #4

Is there anything else running on the Logstash host that might be using cup the rest of the CPU? Did you have 4 workers configured when you ran on VM with 8 cores? If so that would probably explain why it never went above 400%.

It would help if you could provide the version of Logstash you are using and show us your configuration.


(Laverio) #5

On this specific vm I have 8 cores and 8 workers, and nothing else is running. The Kafka queue is on another server reached by a 1Gbit lan.


(Christian Dahlqvist) #6

It would help if you could provide the version of Logstash you are using and show us your configuration.


(Laverio) #7

Logstash 5.1.2 with this config file.
input {
kafka {
topics => "asa-firewall-2"
type => "firewall"
consumer_threads => 32
max_poll_records => "8000"
bootstrap_servers => "10.64.2.225:9092"
}
}
filter {
grok {
match => {
"message" => [
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} inbound %{DATA:protocol} connection %{NUMBER:sessionid} for %{DATA:srcinterface}:%{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} (%{DATA:JUNK}) to %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT} (%{DATA:JUNK})$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} outbound %{DATA:protocol} connection %{NUMBER:sessionid} for %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT} (%{DATA:JUNK}) to %{DATA:srcinterface}:%{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} (%{DATA:JUNK})$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} %{DATA:protocol} connection %{NUMBER:sessionid} for %{DATA:srcinterface}:%{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} to %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT} duration %{TIME:duration} bytes %{DATA:bytes}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} %{DATA:protocol} connection %{NUMBER:sessionid} for %{DATA:srcinterface}:%{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} to %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT} duration %{DATA:duration} bytes %{DATA:bytes} %{GREEDYDATA:event}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} dynamic %{DATA:protocol} translation from %{DATA:srcinterface}:%{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} to %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT} duration %{TIME:duration}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} dynamic %{DATA:protocol} translation from %{DATA:srcinterface}:%{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} to %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} %{DATA:protocol} for faddr %{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} gaddr %{IPORHOST:JUNK}/%{NUMBER:JUNK} laddr %{IPORHOST:destIP}/%{NUMBER:destPORT}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description}; Connection for %{DATA:protocol} src %{DATA:srcinterface}:%{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} dst %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT} %{DATA:description}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} for %{DATA:protocol} src %{DATA:srcinterface}:%{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} dst %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description}; Connection for %{DATA:protocol} src %{DATA:srcinterface}:%{IPORHOST:sourceIP} dst %{DATA:dstinterface}:%{IPORHOST:destIP} %{DATA:description}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} from %{DATA:srcinterface}:%{IPORHOST:sourceIP} (%{DATA:JUNK}) to %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT} (%{DATA:JUNK})$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} from %{DATA:srcinterface}:%{IPORHOST:sourceIP} to %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT} duration %{TIME:duration} bytes %{DATA:bytes}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} for protocol %{DATA:protocol} src %{DATA:srcinterface}:%{IPORHOST:sourceIP} dst %{DATA:dstinterface}:%{IPORHOST:destIP}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} from %{DATA:srcinterface}:%{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} to %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT}, %{DATA:description}$"
]
}
}

	date {
		match => [ "timestamp", "ISO8601" ]
                    target => "@timestamp"
	}
	if !("_grokparsefailure" in [tags]) {
                    mutate {
                            remove_field => [ "message", "JUNK" ]
                    }
            }
}
    output {
			elasticsearch {
				manage_template => "false"
                                    hosts => ["10.64.2.207", "10.64.2.208", "10.64.2.209"]
                                    index => [ "asa-%{+YYYY.MM}" ]
				flush_size => 5000
                            }

    }

(Magnus Bäck) #8

That grok expression is very, very inefficient. Stop using DATA everywhere and extract the longest common prefix to a separate grok filter. For example, it seems like every single expression starts with

^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description}

so extract that part (without using DATA) so it doesn't get matched again and again as the grok filter tries the expressions in order.


(Laverio) #9

I'll try replacing data with WORD and checking it in grok debug, however, splitting the anchored expression did not produce a performance increment.


(Christian Dahlqvist) #10

What does your updated configuration look like? Did you replace DATA with something more specific?


(Laverio) #11

Now these are may expressions:

"^%{TIMESTAMP_ISO8601:timestamp} <166>%%{CISCOTAG:msgtype}: %{CISCOMY305011_305012}$",
"^%{TIMESTAMP_ISO8601:timestamp} <166>%%{CISCOTAG:msgtype}: %{CISCOMY302020_302021}$",
"^%{TIMESTAMP_ISO8601:timestamp} <166>%%{CISCOTAG:msgtype}: %{CISCOMY302016}$",
"^%{TIMESTAMP_ISO8601:timestamp} <166>%%{CISCOTAG:msgtype}: %{CISCOMY302017_302018}$",
"^%{TIMESTAMP_ISO8601:timestamp} <163>%%{CISCOTAG:msgtype}: %{CISCOMY305006}$",
"^%{TIMESTAMP_ISO8601:timestamp} <166>%%{CISCOFW302010}$",
"^%{TIMESTAMP_ISO8601:timestamp} <166>%%{CISCOTAG:msgtype}: %{CISCOMY302013_302014_302015_302016}$",
"^%{TIMESTAMP_ISO8601:timestamp} <166>%%{CISCOTAG:msgtype}: %{CISCOMY303002}$"

I dug thru the Cisco expressions given with logstash, customizing some of them and now I'm on my way to try their efficency.


(Laverio) #12

On my 4 cores VM, with these expressions, the average messages changed from 3855/s to 3868/s... Losing time on optimizations wasn't worth the price.


(Laverio) #13

Well, after a three days training on ElasticSearch, and with a little help from teachers and documentation, I've found the clue. With 4000m/s (80 millions of docs a day) we were facing an I/O dispute on fiber channel disks due to our architecture and a mistake made from the sysadmins.

Basically, two of the three ES nodes were on the same physical machine, trying to use the same HW at the same time.

I wouldn't suggest a shared VMWare cluster with SAN FC disks on production environments if you plan to use it as realtime log analysis.


(system) #14

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.