Logstash 1.5.3 crashes with moderate load

We've set up a (r)elk stack in AWS. Everything seems to work without issue until we hit around 10k (100MB) of documents indexed at elastic search per second (noticed in Kopf). At about that point, all of our log stash indexing nodes crash and events stop being sent to elastic search and the redis cache starts to build up.

When I look at the log stash processes, nothing is happening:
I use top -Hp cat /var/run/logstash.pid to check all the related pids for log stash. Normally, I see <redis, |worker, and >elastic search, and java. However when this incident happens, all threads just say "java"with 0% cpu and 2.7 mem.

When I look at log stash in debug mode, I see the following error:
Exception in thread "Ruby-0-Thread-78: /opt/logstash/vendor/bundle/jruby/1.9/gems/stud-0.0.20/lib/stud/buffer.rb:92" Exception in thread "Ruby-0-Thread-111: /opt/logstash/vendor/bundle/jruby/1.9/gems/stud-0.0.20/lib/stud/buffer.rb:92"
java.lang.UnsupportedOperationException
at java.lang.Thread.stop(Thread.java:869)
at org.jruby.RubyThread.exceptionRaised(RubyThread.java:1221)
at org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:112)
at java.lang.Thread.run(Thread.java:745)
java.lang.UnsupportedOperationException

This is followed by errors stating that log stash cannot reach the elastic search ELB as well as this message:
{:timestamp=>"2015-08-06T15:07:54.626000-0700", :message=>"retrying failed action with response code: 429", :level=>:warn}

At that point it completely stops sending to elastic search.

We have a fairly complex log filter, however the log stash indexing nodes are barely utilizing resources when the crashing happens (< 10% cpu, < 25% memory in use, plenty of disk space).

When I blackhole the biggest filter section using
if "request-json" == [type] {
drop { }
and restart logstash, everything works fine at the current levels we are seeing.

I'm wondering if there are suggestions for tuning log stash indexing / filtering to handle large messages better? Are there any system tweaks that would let log stash handle them better?

Our architecture is as such:
Logstash-forwarder on our application nodes, sends logs to a log stash shipper elb with 3 log stash shipper nodes behind it. We then output that to 3 redis nodes. We have a fleet of 6 c3.4xlarge log stash indexing nodes reading from the 3 redis queues. The log stash indexing nodes output to elastic search elb which contains 16 m2.4xlarge data nodes, with 3 m2.4xlarge elastic search master nodes. We then read from the elastic search master nodes with Kibana.

I will add a comment with the sanitized snippet as I'm at the character limit.

filter {
if "somelog-json" == [type] {

drop { }

json {
source => "message"
target => "payload"
}

json {
source => "[payload][message]"
target => "[payload][detail]"
}
if [payload][detail][sanitized] {
mutate {
add_field => {"is_cmd" => false}
}
}
else {
mutate {
add_field => {"is_cmd" => true}
}
}
mutate {
add_field => {“sanitized" => "%{[payload][detail][sanitized]}"}
add_field => {"sanitized" => "%{[payload][detail][data][message][invoke][sanitized][c]}"}
add_field => {"sanitized" => "%{[payload][detail][data][message][invoke][sanitized]}"}
add_field => {"sanitized" => "%{[payload][detail][data][message][invoke][sanitized]}"}
add_field => {"sanitized" => "
%{[payload][detail][sanitized]}"}
add_field => {"sanitized" => "%{[payload][detail][sanitized]}"}
add_field => {"sanitized" => "
%{[payload][detail][sanitized]}"}
remove_field => [ "[payload][detail]" ]
}
mutate {
add_field => {"tmp_raw_request" => "%{[payload][message]}"}
remove_field => [ "message" ]
remove_field => [ "[payload][message]" ]
}
range {
ranges => [ "tmp_raw_request", 0, 2048, "field:raw_request:%{tmp_raw_request}",
"tmp_raw_request", 2048, 99999999999, "field:raw_request:TOO_LONG_OVER_2048" ]
}
mutate {
remove_field => [ "tmp_raw_request" ]
}

date {
match => [ "[payload][@timestamp]", "ISO8601" ]
}

if [payload][sanitized][sanitized][sanitized] {
mutate {
add_field => {"sanitized" => "%{[payload][sanitized][sanitized][sanitized]}"}
}
}
else {
mutate {
add_field => {"sanitized" => "UNKNOWN"}
}
}

if [payload][sanitized][sanitized][Id] {
mutate {
add_field => {"id" => "%{[payload][sanitized][sanitized][Id]}"}
}
}
else {
mutate {
add_field => {“id" => "UNKNOWN"}
}
}

} else if "someotherlog" == [type] {
grok {
match => {
"message" => "(?.{23})[^{]+(?.*)"
}
}
date {
match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss,SSS" ]
timezone => "America/Los_Angeles"
}
json {
source => "json"
target => "payload"
}

if "SpecialError" == [payload][errortypefield] {
ruby {
init => "require 'base64'"
code => "event['stacktrace'] = Base64.decode64(event['payload']['customData']) if event.include?('payload') and event['payload'].include?('customData')"
}
mutate {
add_field => {"errorMessage" => "%{[stacktrace]}"}
}
else {
grok {
match => {
"[payload][message]" => "(?[^\n\t\r]*)"
}
}
}

It might be ES, what are you monitoring that with other than kopf?

Es seems stable. We are monitoring with nagios, zabix, and data dog. ES
doesn't seem to report any issues and responds to other traffic without
issues.

Many thanks.

Sent from my mobile. Spelling and grammar without guarantee.