Data enrichment Huge CPU consumption

I am trying to use logstash translate plugin to enrich data before putting to elasticsearch.
There are 4 tables I used to join, which I already exported as yml file.
The yml files are in the following format:
id: filedValue1~ filedValue2~ ... filedValueN

There are around 90 fields after I joined all the yml files with logstash message, and after joining, the message volume is around 1.2kb
When the messages gets fast, like around 40 messages per second, the cpu usage goes above 70%, and logstash stops working. The following is logstash code.

input {
beats {
port => 5044
codec => "json"
}
}
filter {
json { source => "message” }
if [join_id1]
{
#enrich data by adding creative information
translate {
field => " join_id1"
destination => "join1_dict"
dictionary_path => "enriched_data1.yml"
refresh_interval => 300
}
grok {
match => { " join1_dict" => "%{GREEDYDATA:field1}~%{GREEDYDATA: field2}~%{GREEDYDATA:fieldn}"}
remove_field => ["join1_dict"]
}
}
if [join_id2]
{
#enrich data by adding creative information
translate {
field => " join_id2"
destination => "join2_dict"
dictionary_path => "enriched_data2.yml"
refresh_interval => 400
}
grok {
match => { " join2_dict" => "%{GREEDYDATA:field1}~%{GREEDYDATA: field2}~%{GREEDYDATA:fieldn}"}
remove_field => ["join2_dict"]
}
}
if [join_id3]
{
#enrich data by adding creative information
translate {
field => " join_id3"
destination => "join3_dict"
dictionary_path => "enriched_data3.yml"
refresh_interval => 500
}
grok {
match => { " join3_dict" => "%{GREEDYDATA:field1}~%{GREEDYDATA: field2}~%{GREEDYDATA:fieldn}"}
remove_field => ["join3_dict"]
}
}
if [join_id4]
{
#enrich data by adding creative information
translate {
field => " join_id4"
destination => "join4_dict"
dictionary_path => "enriched_data4.yml"
refresh_interval => 350
}
grok {
match => { " join4_dict" => "%{GREEDYDATA:field1}~%{GREEDYDATA: field2}~%{GREEDYDATA:fieldn}"}
remove_field => ["join4_dict"]
}
}
output {
elasticsearch { … }
}

The following is the returned messages in command line:
The error on logstash side:

CircuitBreaker::rescuing exceptions {:name=>"Beats input", :exception=>LogStash::Inputs::Beats::InsertingToQueueTakeTooLong, :level=>:warn}
Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover. {:exception=>LogStash::Inputs::BeatsSupport::CircuitBreaker::HalfOpenBreaker, :level=>:warn}
Beats input: the pipeline is blocked, temporary refusing new connection. {:reconnect_backoff_sleep=>0.5, :level=>:warn}

The error on filebeat side:

2016/03/03 19:45:27.730998 single.go:76: INFO Error publishing events (retrying): EOF
2016/03/03 19:45:27.731022 single.go:152: INFO send fail

it would be good if logstash can chain the enrichment job as storm, so different worker does different part of data enrichment in an assembly line.

Is there any suggest you have to optimize data enrichment tasks?

That's pretty big.
Along with your filters it's not surprising you are using a lot of CPU. How many cores do you have?

Which version of Logstash?

match => { " join1_dict" => "%{GREEDYDATA:field1}~%{GREEDYDATA: field2}~%{GREEDYDATA:fieldn}"}

This is inefficient. How about using a mutate filter and its split option? Or a csv filter? Or at the very least use a cheaper expression like e.g.

match => { " join1_dict" => "(?<field1>[^~]+)~(?<field2>[^~]+)~%{GREEDYDATA:fieldn}"}

it would be good if logstash can chain the enrichment job as storm, so different worker does different part of data enrichment in an assembly line.

Logstash can process events parallel. Look at the -w option.

I have 8 cores, 64G RAM. I am using logstash version 2.2

I used csv filter, the cpu load is the same, and the pipeline ends unexpectedly after 7 minutes of running
:exception=>java.lang.OutOfMemoryError: Java heap space,

This is the corresponding output on commanding line

Beats input: the pipeline is blocked, temporary refusing new connection. {:reconnect_backoff_sleep=>0.5, :level=>:warn}
Beats input: the pipeline is blocked, temporary refusing new connection. {:reconnect_backoff_sleep=>0.5, :level=>:warn}
Beats input: the pipeline is blocked, temporary refusing new connection. {:reconnect_backoff_sleep=>0.5, :level=>:warn}
Beats Input: Remote connection closed {:peer=>"10.201.1.54:46258", :exception=>#<Lumberjack::Beats::Connection::ConnectionClosed: Lumberjack::Beats::Connection::ConnectionClosed wrapping: Lumberjack::Beats::Parser::UnsupportedProtocol, unsupported protocol 22>, :level=>:warn}
CircuitBreaker::rescuing exceptions {:name=>"Beats input", :exception=>LogStash::Inputs::Beats::InsertingToQueueTakeTooLong, :level=>:warn}
Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover. {:exception=>LogStash::Inputs::BeatsSupport::CircuitBreaker::HalfOpenBreaker, :level=>:warn}
Beats Input: Remote connection closed {:peer=>"10.201.1.54:46456", :exception=>#<Lumberjack::Beats::Connection::ConnectionClosed: Lumberjack::Beats::Connection::ConnectionClosed wrapping: Lumberjack::Beats::Parser::UnsupportedProtocol, unsupported protocol 22>, :level=>:warn}
java.lang.OutOfMemoryError: Java heap space
Dumping heap to /opt/logstash/heapdump.hprof ...
Unable to create /opt/logstash/heapdump.hprof: File exists
LogStash::Filters::Translate: Bad Syntax in dictionary file, continuing with old dictionary {:dictionary_path=>"/opt/elkstats/pipeline/data_enrichment/creative.yml", :level=>:warn}
Beats input: unhandled exception {:exception=>java.lang.OutOfMemoryError: Java heap space, :backtrace=>["java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)", "java.nio.ByteBuffer.allocate(ByteBuffer.java:335)", "org.jruby.RubyEncoding$UTF8Coder.(RubyEncoding.java:273)", "org.jruby.RubyEncoding.getUTF8Coder(RubyEncoding.java:335)", "org.jruby.RubyEncoding.decodeUTF8(RubyEncoding.java:250)", "org.jruby.runtime.Helpers.decodeByteList(Helpers.java:3077)", "org.jruby.RubyString.decodeString(RubyString.java:798)", "org.jruby.RubyString.toJava(RubyString.java:7724)", "org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:44)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)", "org.jruby.ast.CallOneArgNode.interpret(CallOneArgNode.java:57)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.IfNode.interpret(IfNode.java:116)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)", "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:182)", "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:203)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)", "org.jruby.ast.CallOneArgNode.interpret(CallOneArgNode.java:57)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.ast.RescueNode.executeBody(RescueNode.java:221)", "org.jruby.ast.RescueNode.interpret(RescueNode.java:116)", "org.jruby.ast.EnsureNode.interpret(EnsureNode.java:96)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)", "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:182)", "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:203)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)", "org.jruby.ast.FCallOneArgNode.interpret(FCallOneArgNode.java:36)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)"], :level=>:error}

Before testing, I already edited the setting in /etc/sysconfig/logstash, and changed the setting to LS_HEAP_SIZE="2048", and my mornitoring tool newrelic showed the process only took 1.2GB memory when running.

Thank you magnusbaeck
This statement is more efficient, the cpu load decreased 2/3 compared to DGREEDYDATA
match => { " join1_dict" => "(?[^~]+)~(?[^~]+)~..."}