Hi!
I want to confgure logstash pipeline.
I got many logs in bulk format:
ex of one log entry:
{"index":{"_index":"orchestrator-index","_id":"xxx"}}
{"message":"@metrics Exception count: 10","level":"Information","logType":"User","timeStamp":"2023-05-15T09:36:02.3590216+02:00","fingerprint":"xxx","windowsIdentity":"xxx","machineName":"VM","fileName":"xxx","logF_BusinessProcessName":"Framework","processName":"xxx,"processVersion":"1.0.18","jobId":"xxx","robotName":"xxxx","machineId":xxx,"organizationUnitId":xxx}
Some processes in message field prints @metrics Exeption count: 10
I want to configure logstash pipeline to catch this metrics and store in another field ex. Excepitons
At the end i want to visualise ths metrics using observability. All log messeges with metrics included always starts with @metrics metric_type count: number I want to extract this number to another fields which will be interprated by elastic/kibana as metrics.
Thanks, for help!
My current config:
input {
http {
port => 19200
user => "elastic"
ssl => true
#ssl.certificate_authorities => ["C:\elk\logstash-8.7.0\config\certs\ca.cer"]
ssl_certificate => "C:\elk\logstash-8.7.0\config\certs\cert.crt"
ssl_key => "C:\elk\logstash-8.7.0\config\certs\key.key"
#ssl_verify_mode => "force_peer"
password => "pass"
additional_codecs => {
"application/x-ndjson" => "es_bulk"
}
}
}
filter {
grok {
match => {
"message" => [
"%{DATA:startTitle} - START",
"%{DATA:stopTitle} - STOP",
"%{DATA:stopTitle} - END",
"%{DATA:startTitle} execution started",
"%{DATA:stopTitle} execution ended",
"%{DATA:startTitle} started",
"%{DATA:stopTitle} ended"
]
}
}
if [startTitle] =~ ".+" {
aggregate {
task_id => "%{processName}"
code => "map[event.get('startTitle')] = event.get('timeStamp')"
timeout => 18000
}
}
if [stopTitle] =~ ".+" {
aggregate {
task_id => "%{processName}"
code => "event.set('blockStartTimeStamp', map[event.get('stopTitle')]); event.set('blockStopTimeStamp', event.get('timeStamp')); map.delete(event.get('stopTitle'))"
}
if [blockStartTimeStamp] =~".+" and [blockStopTimeStamp] =~".+" {
ruby {
init => "require 'time'"
code => "event.set('blockDuration', Time.parse(event.get('blockStopTimeStamp')) - Time.parse(event.get('blockStartTimeStamp')))"
}
} else {
}
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[@metadata][codec][es_bulk][_index]}"
document_id => "%{[@metadata][codec][es_bulk][_id]}"
user => "elastic"
password => "pass"
}
}