I am trying to use logstash translate plugin to enrich data before putting to elasticsearch.
There are 4 tables I used to join, which I already exported as yml file.
The yml files are in the following format:
id: filedValue1~ filedValue2~ ... filedValueN
There are around 90 fields after I joined all the yml files with logstash message, and after joining, the message volume is around 1.2kb
When the messages gets fast, like around 40 messages per second, the cpu usage goes above 70%, and logstash stops working. The following is logstash code.
input {
beats {
port => 5044
codec => "json"
}
}
filter {
json { source => "message” }
if [join_id1]
{
#enrich data by adding creative information
translate {
field => " join_id1"
destination => "join1_dict"
dictionary_path => "enriched_data1.yml"
refresh_interval => 300
}
grok {
match => { " join1_dict" => "%{GREEDYDATA:field1}~%{GREEDYDATA: field2}~%{GREEDYDATA:fieldn}"}
remove_field => ["join1_dict"]
}
}
if [join_id2]
{
#enrich data by adding creative information
translate {
field => " join_id2"
destination => "join2_dict"
dictionary_path => "enriched_data2.yml"
refresh_interval => 400
}
grok {
match => { " join2_dict" => "%{GREEDYDATA:field1}~%{GREEDYDATA: field2}~%{GREEDYDATA:fieldn}"}
remove_field => ["join2_dict"]
}
}
if [join_id3]
{
#enrich data by adding creative information
translate {
field => " join_id3"
destination => "join3_dict"
dictionary_path => "enriched_data3.yml"
refresh_interval => 500
}
grok {
match => { " join3_dict" => "%{GREEDYDATA:field1}~%{GREEDYDATA: field2}~%{GREEDYDATA:fieldn}"}
remove_field => ["join3_dict"]
}
}
if [join_id4]
{
#enrich data by adding creative information
translate {
field => " join_id4"
destination => "join4_dict"
dictionary_path => "enriched_data4.yml"
refresh_interval => 350
}
grok {
match => { " join4_dict" => "%{GREEDYDATA:field1}~%{GREEDYDATA: field2}~%{GREEDYDATA:fieldn}"}
remove_field => ["join4_dict"]
}
}
output {
elasticsearch { … }
}
The following is the returned messages in command line:
The error on logstash side:
CircuitBreaker::rescuing exceptions {:name=>"Beats input", :exception=>LogStash::Inputs::Beats::InsertingToQueueTakeTooLong, :level=>:warn}
Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover. {:exception=>LogStash::Inputs::BeatsSupport::CircuitBreaker::HalfOpenBreaker, :level=>:warn}
Beats input: the pipeline is blocked, temporary refusing new connection. {:reconnect_backoff_sleep=>0.5, :level=>:warn}
The error on filebeat side:
2016/03/03 19:45:27.730998 single.go:76: INFO Error publishing events (retrying): EOF
2016/03/03 19:45:27.731022 single.go:152: INFO send fail
it would be good if logstash can chain the enrichment job as storm, so different worker does different part of data enrichment in an assembly line.
Is there any suggest you have to optimize data enrichment tasks?