hello team,
there was a need to analyze SAP audit logs. A filebeat was installed on the machines to transfer logs from to logstash.
Format of audit logs - a file is created every day and data is written in one line (message 200 characters) - file size can reach 200-500MB
2AU520201219000000000986400057D3r-sap-ueUSERQWERTY SAPMSSY1 3001F&0&R r-sap-uerpn04.testin2AUK20201219000000000986400057D3r-sap-ueUSERQWERTY SAPMSSY1 3001ARFC&&ARFC_RUN_NOWAIT r-sap-uerpn04.testin2AU120201219000000000475600062D310.2.0.8ZLE_OPTIMIZE SAPMHTTP 3001H&0&P 10.0.0.1 2AU520201219000001000986400057D3v-sap-psSM_SLM SAPMSSY1 3001S&0&P q-sap-qwert01.testin2AUK20201219000001000986400057D3v-sap-psSM_SLM SAPMSSY1 3001SRFC&&RFC_SYSTEM_INFO q-sap-qwert01.testin2AU520201219000005000475600062D3 SMTMSM1 SAPMSSY1 3001F&0&R 10.0.0.1
The filebeat configuration is simple - the path to the logs and then redirecting them to the logstash.
I want the logs to be divided into a message of 200 characters and parsed.
My basic config is like this, but it's incomplete, I don't know what to do next
input {
beats{
port => 5044
}
}
filter {
mutate {
gsub => [ "message", "2AU", "\n2AU" ]}
split {
field => "message"
}
grok {
patterns_dir => ["/etc/logstash/conf.d/patterns"]
match => { "message" => "%{VERS:version}%{MESSAGEID:messageid}%{DATE:date}%{TIMESTAMP:timestamp}%{OSPID:os_pid}%{SAPPID:sap_pid}%{LOGONTYPE:logontype}%{SAPPIDHEX:sap_pid_hex}%{SERVER:server}%{USERNAME:username}%{TRANS:transaction}%{REPORT:report}%{MANDAT:mandat}%{SESSIONID:sessionid}%{PARAMETERS:parameters}%{TERMINAL:terminal}" }
}
output {
elasticsearch {
hosts => ["https://elasticsearch:9200"]
index => "%{[@metadata][beat]}-%{+yyyy.MM}"
user => "user"
password => "password"
}
}
Am I heading in the right direction? since it doesn't work?