Filebeat CPU high

HI
We are using Filebeat to collect the logs from various folders.Example.A service is calling A1,A2 and A3.

All the logs are written in their respective folders.Filbeat is pushing the logs to logstash.In the logstash we are receiving and concat in the same order how the A service is calling.

IN our Filebeat configuration we set backoff_factor to .000001s.Becasue of this the CPU is reaching around 40%.If we set backoff factor to 1s then we are not able to concat the logs correctly and some of the logs are missedout becasue of 1s

IN logstas we are using aggregation plugin.

A low backoff makes filebeat probe files for new content every so often. The lines read from log files are forwarded to the spooler having a flush timeout of 1s by default (consider reducing flush timeout). I've no idea about required timings by aggregation plugin, though. Maybe check the docs.

Hi Steffens,

Instead of aggregation doing in Logstash,we can shift the process to elastic search.It is possible from ealstic search to write the output to the file.

Instead of aggregation doing in Logstash,we can shift the process to Elasticsearch.It is possible from ealstic search to write the output to the file.

Please explain in more detail what you want to do. What kind of concatenation are you doing?

Please add some more details as requested by @magnusbaeck.

Main Flow:
2016-05-31 11:42:39,745 429308876 [http-nio-10080-exec-27] 01daa1e7-3ea0-4b34-94c5-bb588bf67a5e - entry:"Agent-register-bs-2.0"|Agent-register-bs-2.0:
2016-05-31 11:42:39,754 429308885 [http-nio-10080-exec-27] 01daa1e7-3ea0-4b34-94c5-bb588bf67a5e - send:"A1- Service"
2016-05-31 11:42:39,757 429308888 [http-nio-10080-exec-27] 01daa1e7-3ea0-4b34-94c5-bb588bf67a5e - receive:"A1-Server"
2016-05-31 11:42:40,214 429309345 [http-nio-10080-exec-27] 01daa1e7-3ea0-4b34-94c5-bb588bf67a5e - send:"A2-Service"
2016-05-31 11:42:40,216 429309347 [http-nio-10080-exec-27] 01daa1e7-3ea0-4b34-94c5-bb588bf67a5e - receive:"A2-Service"
2016-05-31 11:42:40,286 429309417 [http-nio-10080-exec-27] 01daa1e7-3ea0-4b34-94c5-bb588bf67a5e - exit:"Agent-register-bs-2.0"|Status:201|Agent-register-bs-2.0:""

in A1-logs

2016-05-31 00:40:12,304 304414583 [http-nio-10180-exec-42] 01daa1e7-3ea0-4b34-94c5-bb588bf67a5e - account-information-ds-1.1:""
2016-05-31 01:00:11,982 651154503 [http-nio-10380-exec-72] 01daa1e7-3ea0-4b34-94c5-bb588bf67a5e - entry:"Agent-account"
2016-05-31 01:00:12,189 651154710 [http-nio-10380-exec-72] 01daa1e7-3ea0-4b34-94c5-bb588bf67a5e - exit:"-Agentaccount"|Status:201|Agent-account:""

IN A2 - logs

also like previous format

final output

2016-05-31T11:52:49,825 sessionId=01daa1e7-3ea0-4b34-94c5-bb588bf67a5e - exit:Agent-register_bs_2|Status:201|Response_Time:554|instance:|sessionId=Agent|m_type:A1|Status:200|Response_Time:149||A2|Status:200|Response_Time:143||audit-management-ds-2.0|Status:200|Response_Time:10||register_bs_2

What we need is final output. From the Main Flow if you see the flow Main service is calling A1,A2 etc.IN Main Service exit we have response code and time.Like this the service calling from Main flow we need to capture response time and code in the final output.

We are crawling .000001s because each logs will be in seperate folder.

Please find my Configuration of concat in logstash.is there any way to tweak by reducing 1s in filebeat and collect and make the format as described

Logstash config file

Split the fields and parse the tag fields

Logstash config file

Split the fields and parse the tag fields

#Extract the message fields
filter {
grok {
patterns_dir => "/dummy_patterns/logstash-2.1.1/conf.d/patterns"
match => [ "message", "%{TIMESTAMP_ISO8601:message_timestamp} %{NUMBER} %{UNKNOWN_FIELD1} %{CORRELATION_ID:correlation_Id} %{GREEDYDATA:syslog5424_msg}" ]
}
#Extract the source fields
grok {
patterns_dir => "/dummy_patterns/logstash-2.1.1/conf.d/patterns"
match => [ "source", "%{SLASH}%{WORD}%{SLASH}%{WORD}%{SLASH}%{GREEDYDATA:parent_Service_Name}%{SLASH}%{GREEDYDATA}" ]

}

#Substitute "-" with "" in service name so as to visualize in kibana
mutate { gsub => ["parent_Service_Name","-","
"] }

mutate { gsub => ["message_timestamp"," ","T"] }

#Match the Timestamp
date {
match => [ "syslog_timestamp", "ISO8601" ]
}

#Parse fields on the basis of operation type
if "entry" in [syslog5424_msg] {
mutate { add_field => { "operation_Type" => "entry"}}
grok {
patterns_dir => "/dummy_patterns/logstash-2.1.1/conf.d/patterns"
match => [ "syslog5424_msg", "%{HYPEN} entry:%{QUOTES}%{GREEDYDATA:message_Service_Name}%{QUOTES}" ]
}
} else if (("send" in [syslog5424_msg]) or ("Send" in [syslog5424_msg])) {
mutate { add_field => { "operation_Type" => "send"} }
grok {
patterns_dir => "/dummy_patterns/logstash-2.1.1/conf.d/patterns"
match => [ "syslog5424_msg", "%{HYPEN} send:%{QUOTES}%{WORD}://%{HOSTPORT}/%{GREEDYDATA:message_Service_Name}%{SLASH}%{WORD}%{SLASH}%{NUMBER}" ]
match => [ "syslog5424_msg", "%{HYPEN} send:%{QUOTES}%{WORD}://%{HOSTPORT}/%{GREEDYDATA:message_Service_Name}%{SLASH}%{WORD}%{SLASH}%{WORD}" ]
match => [ "syslog5424_msg", "%{HYPEN} send:%{QUOTES}%{WORD}://%{HOSTPORT}/%{GREEDYDATA:message_Service_Name}%{SLASH}%{WORD}" ]
match => [ "syslog5424_msg", "%{HYPEN} Send: %{GREEDYDATA:message_Service_Name}:%{QUOTES}" ]

}

} else if (("receive" in [syslog5424_msg]) or ("Receive" in [syslog5424_msg])) {
mutate { add_field => { "operation_Type" => "receive"} }
grok {
patterns_dir => "/dummy_patterns/logstash-2.1.1/conf.d/patterns"
match => [ "syslog5424_msg", "%{HYPEN} receive:%{QUOTES}%{WORD}://%{HOSTPORT}/%{GREEDYDATA:message_Service_Name}%{SLASH}%{WORD}%{SLASH}%{NUMBER}" ]
match => [ "syslog5424_msg", "%{HYPEN} receive:%{QUOTES}%{WORD}://%{HOSTPORT}/%{GREEDYDATA:message_Service_Name}%{SLASH}%{WORD}%{SLASH}%{WORD}" ]
match => [ "syslog5424_msg", "%{HYPEN} receive:%{QUOTES}%{WORD}://%{HOSTPORT}/%{GREEDYDATA:message_Service_Name}%{SLASH}%{WORD}" ]
match => [ "syslog5424_msg", "%{HYPEN} Receive: %{GREEDYDATA:message_Service_Name}:%{QUOTES}" ]
}
} else if ("exit" in [syslog5424_msg]) {
mutate { add_field => { "operation_Type" => "exit"} }
grok {
patterns_dir => "/dummy_patterns/logstash-2.1.1/conf.d/patterns"
match => [ "syslog5424_msg", "%{HYPEN} exit:%{QUOTES}%{DATA:message_Service_Name}%{QUOTES}%{GREEDYDATA:rawdata}" ]
}
}

Second phase
if ([rawdata] =~ /.+/ ) {
grok {
patterns_dir => "/dummy_patterns/logstash-2.1.1/conf.d/patterns"
match => [ "rawdata", "%{PIPE}Status:%{NUMBER:StatusCode:int}%{PIPE}%{DATA}%{COLON}%{QUOTES}%{GREEDYDATA}%{COMMA}%{NUMBER:Response_Time:int}%{QUOTES}" ]
remove_field => ["rawdata"]
}
}
if (([operation_Type] =~ /.+/ ) and ("exit" in [operation_Type])) {
mutate { add_field => { "my_format" => "%{message_Service_Name}|Status:%{StatusCode}|Response_Time:%{Response_Time}" }}
}
#logs aggregation logic to combine log line to a single line
if (("entry" in [syslog5424_msg]) and (("bs" in [parent_Service_Name]) or ("ds" in [parent_Service_Name]) or ("as" in [parent_Service_Name]) or ("is" in [parent_Service_Name]))) {
aggregate {
task_id => "%{correlation_Id}"
code => "map['called_Service_Names'] = '' ; map['called_Service_Names_order'] = ''"
map_action => "create"
}

 }
 if((("send" in [syslog5424_msg]) or ("Send" in [syslog5424_msg])) and ("_bs_" in [parent_Service_Name]) and ([message_Service_Name] =~ /.+/ )) {
              aggregate {
	task_id => "%{correlation_Id}"
            code => "map['called_Service_Names_order'] += event['message_Service_Name']+'||'"
	map_action => "update"
	}		
}
if((("_ds_" in [parent_Service_Name]) or ("_as_" in [parent_Service_Name]) or ("_is_" in [parent_Service_Name])) and ("exit" in [syslog5424_msg]) and ([syslog5424_msg] =~ /.+/ )) {
              aggregate {
	task_id => "%{correlation_Id}"		
            code => "map['called_Service_Names'] += event['internal_jukka_format']+'||'"
	map_action => "update"				
}
	
}
if (("exit" in [syslog5424_msg]) and ("_bs_" in [parent_Service_Name])) {
 aggregate {
	task_id => "%{correlation_Id}"
            code => "event['called_Service_Names'] = map['called_Service_Names'] ; event['called_Service_Names_order'] = map['called_Service_Names_order']"
	map_action => "update"
	end_of_task => true
                  timeout => 120
	}

       grok { match => [ "host", "%{WORD:hostserver}.%{GREEDYDATA}" ] }
       mutate { add_field => { "jukka_format" => "%{message_timestamp} sessionId=%{correlation_Id} - exit:%{parent_Service_Name}|Status:%{StatusCode}|Response_Time:%{Response_Time}|instance:%{hostserver}|sessionId=%{correlation_Id}|m_type:%{called_Service_Names}%{parent_Service_Name}:%{hostserver}"}}
	 
 }

#Remove unwanted fields
if([syslog5424_msg] =~ /.+/ ) { mutate { remove_field => ["syslog5424_msg" , "source" , "offset", "fields", "host"] }}

}

Is collate with aggregate will be usefull for this,

Some more additional metrics

$ vmstat 1 10
procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
r b swpd free buff cache si so bi bo in cs us sy id wa st
12 0 2958996 3282928 17976 2381976 1 1 18 10 0 2 15 6 79 0 0
5 0 2958996 3282788 17976 2382360 0 0 252 0 43742 96456 40 16 44 1 0
6 0 2958980 3282344 17976 2382436 44 0 44 0 41796 93239 36 16 48 0 0
4 0 2958976 3277600 17984 2382372 0 0 0 668 37199 80930 45 17 38 0 0
4 0 2958656 3277616 17984 2382436 732 0 732 0 44076 90572 36 20 43 1 0
5 0 2958656 3277460 17984 2382520 0 0 0 272 45221 93967 37 18 45 0 0
3 0 2958656 3277212 17984 2382524 0 0 0 0 42016 89036 36 17 46 0 0
6 0 2958656 3278060 17984 2382528 0 0 0 0 34676 70690 40 20 41 0 0
3 0 2958656 3278236 17992 2382516 0 0 0 108 32833 68561 37 19 44 0 0
4 0 2958656 3278252 17992 2382524 0 0 0 0 37345 75277 39 20 41 0 0

This is quite too much raw unformated information to possibly digest + not explaining the problem you try to solve. Do you have a simple example? Can you explain what you're trying to achieve in detail. Regarding logstash filters, you might get some more help from logstash forum.