Performance Help

Right now I am using ES to store and search NSM logs produced with Bro. I am monitoring a 4.4g network which as you can imagine produces an extreme amount of log data.

My current setup:
Sensor using logstash to read logs -> Kafka on sensor (works great)
Kafka cluster reading logs from sensor kafka (works great and ingests the logs at what appears to be minimal lag)
Kafka cluster stats:
3 Zookeepers:

Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                4
On-line CPU(s) list:   0-3
Thread(s) per core:    1
Core(s) per socket:    4
Socket(s):             1
NUMA node(s):          1
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 63
Model name:            Intel(R) Xeon(R) CPU E5-2699 v3 @ 2.30GHz
Stepping:              2
CPU MHz:               2141.706
BogoMIPS:              4283.41
Hypervisor vendor:     Microsoft
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              46080K
NUMA node0 CPU(s):     0-3

RAM = 16GB

9 Broker Nodes with the same Stats as Zookeepers
I then have 4 nodes running logstash only,

Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                8
On-line CPU(s) list:   0-7
Thread(s) per core:    1
Core(s) per socket:    8
Socket(s):             1
NUMA node(s):          1
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 45
Model name:            Intel(R) Xeon(R) CPU E5-2650 0 @ 2.00GHz
Stepping:              7
CPU MHz:               1966.048
BogoMIPS:              3932.09
Hypervisor vendor:     Microsoft
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              20480K
NUMA node0 CPU(s):     0-7

RAM = 32GB with 24 set for JAVA_HEAP

These 4 nodes are reading the kafka cluster topics, filtering data, and outputting to ES.
below is my output section:

output {
##########    BRO Outputs -> ES Cluster    ##########
  if [type] == "BRO" {
    if [sensor1] == "host1" {
      elasticsearch {
        hosts => [ "10.1.55.8:9200","10.1.55.9:9200","10.1.55.10:9200","10.1.55.11:9200","10.1.55.12:9200","10.1.55.13:9200","10.1.55.14:9200","10.1.55.15:9200","10.1.55.16:9200","10.1.55.17:9200","10.1.55.18:9200","10.1.55.19:9200" ]
        manage_template => false
        flush_size => 5000
        idle_flush_time => 5
        workers => 8
        index => "sensor1-bro-%{+YYYY.MM.dd}"
      }
    }
    if [sensor2] == "host2" {
      elasticsearch {
        hosts => [ "10.1.55.8:9200","10.1.55.9:9200","10.1.55.10:9200","10.1.55.11:9200","10.1.55.12:9200","10.1.55.13:9200","10.1.55.14:9200","10.1.55.15:9200","10.1.55.16:9200","10.1.55.17:9200","10.1.55.18:9200","10.1.55.19:9200" ]
        manage_template => false
        flush_size => 5000
        idle_flush_time => 5
        workers => 8
        index => "host2-bro-%{+YYYY.MM.dd}"
      }
    }
  }
}

My ES cluster is currently 3 Masters and 12 Data Nodes, all stats are the same:

Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                16
On-line CPU(s) list:   0-15
Thread(s) per core:    1
Core(s) per socket:    16
Socket(s):             1
NUMA node(s):          1
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 45
Model name:            Intel(R) Xeon(R) CPU E5-2650 0 @ 2.00GHz
Stepping:              7
CPU MHz:               1941.527
BogoMIPS:              3883.05
Hypervisor vendor:     Microsoft
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              20480K
NUMA node0 CPU(s):     0-15

RAM = 32GB with 24 set for JAVA_HEAP

continued on second post

continued:

I am using Marvel for monitoring my Cluster. I see that my Java Heap on my data nodes spikes ever 30 or so minutes. My graph looks like the below:

Also, Marvel reports that index time is little over 14,000/s. But no matter how many more nodes I add to my data cluster I don't get any return on my indexing rate.

My daily index is approx 500 - 800 GB per day with 120 - 200 million documents.

The one error I am seeing in my logs is:

"status"=>500, "error"=>{"type"=>"timeout_exception", "reason"=>"Failed to acknowledge mapping update within [30s]"}}}, :level=>:warn}

I currently use dynamic templates and I am thinking of creating a custom template and defineing my fields to see if that helps.

My main issue is that when my network hits peak times elasticsearch cannot index the data fast enough to keep up. So the question becomes how can I increase indexing speeds. I have resources available to increase RAM and CPU's (not unlimited but a fair amount) but as stated I've added additional nodes to my data cluster and seen no return on investment.

Thanks

Hi! It looks like very similar issue here.
Please provide information about ES version and which non default settings applyed.

Additional questions:
How do you realize that indexing rate is not enough?
How are you planning to use data collected (search for exact value, search for fulltext, do you need relevancy, maybe aggregations)?

I am using:
ES 2.2.0
Logstash 2.2.2

The only non default settings I have are
discovery.zen.ping.unicast.hosts: ["Shared A record for master nodes"]

and

marvel.agent.exporters:......

My data is 5 shards 3 replicas at this time.

I realize that my indexing rate is not enough because I am seeing a TON of backpressure on my Kafka nodes. I require near real time access to this data (30 min lag max). When monitoring my sensor I can see all data leaves the kafka node there and enters my kafka cluster. I then see that on times of slower traffic the ES data cluster keep up fine but when traffic hits peak daily activity the cluster cannot index the data fast enough. Like I said, I see that no matter how many nodes I add (I've only added 3 additional nodes) to the data cluster my max indexing rate seems to be 14000/s. That fine but I am generating more events per second than that.

As for what am I doing with the data. I am using it with CIF to allow a team of network CND analysts to perform analytics looking for compromises to the network. I need the ablility to do full text searching (using kibana at this time) and I am doing some aggregations, dashboards, etc to allow the analysts to see relivent data then search for the full story in the logs.

Looking at the referenced post thank you. Any tuning specifics you can give would help me greatly.

Thanks

How did you verify that it is Elasticsearch and not your Logstash setup that is the bottleneck? How often do you see the mapping error in your logs? How many daily indices/shards do you have? Are these distributed evenly across the cluster?

so my LS nodes are not using much ram at all and the CPU utilization is low. I am not seeing any errors in the logs besides the 500 mapping error. That is the reason I "ruled them out". Also, I see that the JVM Heap Usage seems to increase over time on my ES nodes. If it is them great, I'm happy to tune them to any specs necessary, I really just want it to run like a champ.

output from GET _cat/indices:

green open sensor1-bro-2016.02.29     5 3         4 0 263.7kb  65.9kb 
green open sensor1-bro-2016.03.09     5 3   1315466 0   4.2gb     1gb 
green open sensor1-bro-2016.03.08     5 3   3496765 0   9.8gb   2.4gb 
green open sensor-metrics-2016.03.09 5 3     17280 0  23.7mb   5.9mb 
green open sensor-metrics-2016.03.08 5 3      8478 0  12.1mb     3mb 
green open sensor2-bro-2016.03.10        5 3 120087080 0 473.7gb 115.9gb 
green open sensor-metrics-2016.03.10 5 3     10642 0  14.3mb   3.5mb 
green open sensor1-bro-2016.03.10     5 3    252926 0 849.8mb 211.3mb 
green open .kibana                   1 3       163 7 991.7kb 247.9kb 
green open sensor2-bro-1970.01.01        5 3      2746 0   3.3mb   943kb 
green open topbeat-2016.03.08        5 3   2242728 0   1.8gb 473.5mb 
green open topbeat-2016.03.09        5 3   4386754 0   3.6gb 927.6mb 
green open sensor2-bro-2016.03.07        5 3      2332 0  10.9mb   2.5mb 
green open sensor1-bro-2016.03.03     5 3         3 0 153.1kb  49.6kb 
green open sensor2-bro-2016.03.08        5 3 120728499 0   534gb 133.2gb 
green open sensor2-bro-2016.03.09        5 3 213855685 0 874.6gb 218.3gb 
green open sensor1-bro-1970.01.01     5 3        28 0 867.5kb 216.8kb 
green open sensor1-bro-2016.03.07     5 3        13 0     1mb 268.6kb 
green open topbeat-2016.03.10        5 3   2695050 0   2.2gb 567.3mb 
green open sensor2-bro-2016.03.05        5 3        23 0   2.8mb 745.2kb 
green open sensor1-bro-2016.03.05     5 3         3 0 198.9kb  49.7kb 
green open sensor2-bro-2016.03.06        5 3        15 0     2mb 520.6kb 

As for the mapping error today I have seen it 3 times already. So its not all the time or at any specific interval.

Marvel is reporting that my shards are distributed evenly (35 or 36 per node a this moment). Right now I have 17 nodes, 22 indices, 424 shards with 470,590,830 Documents with 2TB data.

Thanks

First, do you really need so many replicas? Try do decrease it to 1 or 0 and see if it helps (now you index your data 4 times, 1 for primary and same for each replica).
Second one, as in topic I mentioned disable _all field if you don't plan to use it.

If your data structure is predictable this will be great as you get full control of any field (type, analizer, doc_values and so on). Maybe disabling doc_values for fields which will never take part in aggregation would be good and give some speedup. Also exclude from indexing (index: no) if don't plan to search or filer on them.

You can also look at setting and discussion in ES indexing rate varies horribly

What does the rest of your Logstash configuration look like? Have you tried feeding Logstash from file instead from Kafka to check if the Kafka input might be a limiting factor?

I will post my entire config below in 3 separate posts (due to character limitations)

I changed my replicas to 1 (I had more to ensure data availability in case of HW failure, but I can live with one if it speeds things up)

I changed "_all": { "omit_norms": true, "enabled": true} to enabled: false.

I cannot send directly from my sensor to my ES cluster. The sensor I have is remote and for network security reason it cannot communicate directly with anything except the kafka cluster

Logstash config:

INPUT SECTION:

input {
  kafka {
    zk_connect => "10.1.84.3:2181,10.1.84.4:2181,10.1.84.5:2181"
    topic_id => "bro-logs"
  }
}

FILTER:

filter {
  if [type] == "BRO" {
    mutate {
      rename => { "_path" => "log_type" }
    }
    date {
      match => ["ts", "UNIX_MS"]
    }
    if [log_type] == "conn" {
      translate {
        field => "conn_state"
        destination => "conn_state_full"
        dictionary => [
          "S0", "Connection attempt seen, no reply",
          "S1", "Connection established, not terminated",
          "S2", "Connection established and close attempt by originator seen (but no reply from responder)",
          "S3", "Connection established and close attempt by responder seen (but no reply from originator)",
          "SF", "Normal SYN/FIN completion",
          "REJ", "Connection attempt rejected",
          "RSTO", "Connection established, originator aborted (sent a RST)",
          "RSTR", "Established, responder aborted",
          "RSTOS0", "Originator sent a SYN followed by a RST, we never saw a SYN-ACK from the responder",
          "RSTRH", "Responder sent a SYN ACK followed by a RST, we never saw a SYN from the (purported) originator",
          "SH", "Originator sent a SYN followed by a FIN, we never saw a SYN ACK from the responder (hence the connection was 'half' open)",
          "SHR", "Responder sent a SYN ACK followed by a FIN, we never saw a SYN from the originator",
          "OTH", "No SYN seen, just midstream traffic (a 'partial connection' that was not later closed)"
        ]
      }
      mutate {
        convert => { "src_ip" => "integer" }
        convert => { "src_port" => "integer" }
        convert => { "src_bytes" => "integer" }
        convert => { "resp_bytes" => "integer" }
        convert => { "missed_bytes" => "integer" }
        convert => { "src_pkts" => "integer" }
        convert => { "orig_ip_bytes" => "integer" }
        convert => { "resp_pkts" => "integer" }
        convert => { "resp_ip_bytes" => "integer" }
      }
    }
    mutate {
      rename => { "id.orig_h" => "src_ip" }
      rename => { "id.orig_p" => "src_port" }
      rename => { "id.resp_h" => "resp_ip" }
      rename => { "id.resp_p" => "resp_port" }
      rename => { "orig_bytes" => "src_bytes" }
      rename => { "orig_pkts" => "src_packets" }
      rename => { "orig_cc" => "src_cc" }
    }
    if [log_type] == "conn" {
      geoip {
        source => "src_ip"
        target => "geoip_src"
      }
      geoip {
        source => "resp_ip"
        target => "geoip_resp"
      }
    }
    else if [log_type] == "notice" {
      geoip {
        source => "src"
        target => "geoip_src"
      }
      geoip {
        source => "dst"
        target => "geoip_resp"
      }
    }
    if [log_type] == "ssl" {
      mutate {
        rename => { "version" => "ssl_version" }
      }
    }
    if [log_type] == "ssh" {
      mutate {
        rename => { "version" => "ssh_version" }
      }
    }
    if [log_type] in ["conn","notice","intel","dns","http_eth1","http_eth2","http_eth3","http_eth4","http_eth5","weird","ssl","ssh"] {
      if [src_ip] !~"(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)|(^224\.0\.0\.)" {  ##Disable Private IP space from Lookup Source###
        mutate {
          add_field => { "src_Senderbase_lookup" => "http://www.senderbase.org/lookup/?search_string=%{src_ip}" }
          add_field => { "src_CBL_lookup" => "http://cbl.abuseat.org/lookup.cgi?ip=%{src_ip}" }
          add_field => { "src_Spamhaus_lookup" => "http://www.spamhaus.org/query/bl?ip=%{src_ip}" }
          add_field => { "src_DomainTools_lookup" => "http://whois.domaintools.com/%{src_ip}" }
        }
      }
      if [resp_ip] !~"(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)|(^224\.0\.0\.)" {  ##Disable Private IP space from Lookup Source##
        mutate {
          add_field => { "resp_Senderbase_lookup" => "http://www.senderbase.org/lookup/?search_string=%{resp_ip}" }
          add_field => { "resp_CBL_lookup" => "http://cbl.abuseat.org/lookup.cgi?ip=%{resp_ip}" }
          add_field => { "resp_Spamhaus_lookup" => "http://www.spamhaus.org/query/bl?ip=%{resp_ip}" }
          add_field => { "resp_DomainTools_lookup" => "http://whois.domaintools.com/%{resp_ip}" }
        }
      }
    }
    if [log_type] in ["http_eth1","http_eth2","http_eth3","http_eth4","http_eth5"] {
      mutate {
        add_field => {"url_full" => "%{host}%{uri}"}
      }
    }
  }
}

OUTPUT:

output {
##########    BRO Outputs -> ES Cluster    ##########
  if [type] == "BRO" {
    if [sensor1] == "host1" {
      elasticsearch {
        hosts => [ "10.1.55.8:9200","10.1.55.9:9200","10.1.55.10:9200","10.1.55.11:9200","10.1.55.12:9200","10.1.55.13:9200","10.1.55.14:9200","10.1.55.15:9200","10.1.55.16:9200","10.1.55.17:9200","10.1.55.18:9200","10.1.55.19:9200" ]
        manage_template => false
        flush_size => 5000
        idle_flush_time => 5
        workers => 8
        index => "sensor1-bro-%{+YYYY.MM.dd}"
      }
    }
    if [sensor2] == "host2" {
      elasticsearch {
        hosts => [ "10.1.55.8:9200","10.1.55.9:9200","10.1.55.10:9200","10.1.55.11:9200","10.1.55.12:9200","10.1.55.13:9200","10.1.55.14:9200","10.1.55.15:9200","10.1.55.16:9200","10.1.55.17:9200","10.1.55.18:9200","10.1.55.19:9200" ]
        manage_template => false
        flush_size => 5000
        idle_flush_time => 5
        workers => 8
        index => "sensor2-bro-%{+YYYY.MM.dd}"
      }
    }
  }
}

If Logstash nor Elasticsearch appear busy it could very well be the Kafka input that is limiting performance, although I have not tuned that plugin so can not tell for sure. It might be useful to try and feed data into a file which you can feed into Logstash through a file input. Depending on whether this gives higher throughput and increases the load on Logstash and Elasticsearch or not, you should be able to tell whether the Kafka input may be the source of the problem or not.

How many filter workers are you using for Logstash?

I am seeing a huge increase in indexing throughput when i modify the kafka input to change the codec => "plain".
i am still trying to figure why the default json codec is slower. YMMV but it will not hurt to try it out.

I'll give it a shot in the AM and report.

the codec change did not help me. My data is in JSON already and when I did codec => "plain" it appeared to not pick up all my data.

I may have done it wrong or didn't wait long enough but if my data is in JSON I would suppose I'd have to use a JSON codec for it to work correctly.

All of my filters are listed above. My LS nodes don't appear busy but my ES nodes do.

Toward the beginning of the thread I posed a screen shot of one of my worker nodes. the JVM Heap increases to a very high amount ever 30 or so minutes. I also still get:

"status"=>500, "error"=>{"type"=>"timeout_exception", "reason"=>"Failed to acknowledge mapping update within [30s]"}}}, :level=>:warn}

Is there a way to increase the mapping timeout? I know it won't solve the problem but it will at least allow me to not drop logs until I figure things out

Why so many replicas?

@tgdesrochers The heap increase and the message is symptomatic for an outlier, a very large document in the stream.

The reason for the replicas was that it was explained to me that replicas
enables us to have hw failures and still not lose any data. We have
snapshots enabled now so I guess the if we had a failure we would only
potentially lose a fraction of the data.

I have cut the replicas to 1 and disabled the _all field. Things are
looking better but Monday morning will be the real test.

My analysts are now just complaining that they have to search specific
fields and cannot do a global search for text or other data. But for speed
and efficiency I guess they will have to put up with a mild inconvenience

So there are no large documents. These are all Bro IDS logs. Could it be
symptomatic of an increase in network usage causing a large amount of logs
to be written which in turn causes a temporary load on the ES cluster
indexing.