HI
We are using Filebeat to collect the logs from various folders.Example.A service is calling A1,A2 and A3.
All the logs are written in their respective folders.Filbeat is pushing the logs to logstash.In the logstash we are receiving and concat in the same order how the A service is calling.
IN our Filebeat configuration we set backoff_factor to .000001s.Becasue of this the CPU is reaching around 40%.If we set backoff factor to 1s then we are not able to concat the logs correctly and some of the logs are missedout becasue of 1s
A low backoff makes filebeat probe files for new content every so often. The lines read from log files are forwarded to the spooler having a flush timeout of 1s by default (consider reducing flush timeout). I've no idea about required timings by aggregation plugin, though. Maybe check the docs.
Instead of aggregation doing in Logstash,we can shift the process to elastic search.It is possible from ealstic search to write the output to the file.
What we need is final output. From the Main Flow if you see the flow Main service is calling A1,A2 etc.IN Main Service exit we have response code and time.Like this the service calling from Main flow we need to capture response time and code in the final output.
We are crawling .000001s because each logs will be in seperate folder.
Second phase
if ([rawdata] =~ /.+/ ) {
grok {
patterns_dir => "/dummy_patterns/logstash-2.1.1/conf.d/patterns"
match => [ "rawdata", "%{PIPE}Status:%{NUMBER:StatusCode:int}%{PIPE}%{DATA}%{COLON}%{QUOTES}%{GREEDYDATA}%{COMMA}%{NUMBER:Response_Time:int}%{QUOTES}" ]
remove_field => ["rawdata"]
}
}
if (([operation_Type] =~ /.+/ ) and ("exit" in [operation_Type])) {
mutate { add_field => { "my_format" => "%{message_Service_Name}|Status:%{StatusCode}|Response_Time:%{Response_Time}" }}
} #logs aggregation logic to combine log line to a single line
if (("entry" in [syslog5424_msg]) and (("bs" in [parent_Service_Name]) or ("ds" in [parent_Service_Name]) or ("as" in [parent_Service_Name]) or ("is" in [parent_Service_Name]))) {
aggregate {
task_id => "%{correlation_Id}"
code => "map['called_Service_Names'] = '' ; map['called_Service_Names_order'] = ''"
map_action => "create"
}
}
if((("send" in [syslog5424_msg]) or ("Send" in [syslog5424_msg])) and ("_bs_" in [parent_Service_Name]) and ([message_Service_Name] =~ /.+/ )) {
aggregate {
task_id => "%{correlation_Id}"
code => "map['called_Service_Names_order'] += event['message_Service_Name']+'||'"
map_action => "update"
}
}
if((("_ds_" in [parent_Service_Name]) or ("_as_" in [parent_Service_Name]) or ("_is_" in [parent_Service_Name])) and ("exit" in [syslog5424_msg]) and ([syslog5424_msg] =~ /.+/ )) {
aggregate {
task_id => "%{correlation_Id}"
code => "map['called_Service_Names'] += event['internal_jukka_format']+'||'"
map_action => "update"
}
}
if (("exit" in [syslog5424_msg]) and ("_bs_" in [parent_Service_Name])) {
aggregate {
task_id => "%{correlation_Id}"
code => "event['called_Service_Names'] = map['called_Service_Names'] ; event['called_Service_Names_order'] = map['called_Service_Names_order']"
map_action => "update"
end_of_task => true
timeout => 120
}
grok { match => [ "host", "%{WORD:hostserver}.%{GREEDYDATA}" ] }
mutate { add_field => { "jukka_format" => "%{message_timestamp} sessionId=%{correlation_Id} - exit:%{parent_Service_Name}|Status:%{StatusCode}|Response_Time:%{Response_Time}|instance:%{hostserver}|sessionId=%{correlation_Id}|m_type:%{called_Service_Names}%{parent_Service_Name}:%{hostserver}"}}
}
This is quite too much raw unformated information to possibly digest + not explaining the problem you try to solve. Do you have a simple example? Can you explain what you're trying to achieve in detail. Regarding logstash filters, you might get some more help from logstash forum.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.