Logstash pipeline configuration - extract metrics from message field

Hi!
I want to confgure logstash pipeline.
I got many logs in bulk format:
ex of one log entry:

{"index":{"_index":"orchestrator-index","_id":"xxx"}}
{"message":"@metrics Exception count: 10","level":"Information","logType":"User","timeStamp":"2023-05-15T09:36:02.3590216+02:00","fingerprint":"xxx","windowsIdentity":"xxx","machineName":"VM","fileName":"xxx","logF_BusinessProcessName":"Framework","processName":"xxx,"processVersion":"1.0.18","jobId":"xxx","robotName":"xxxx","machineId":xxx,"organizationUnitId":xxx}

Some processes in message field prints @metrics Exeption count: 10
I want to configure logstash pipeline to catch this metrics and store in another field ex. Excepitons
At the end i want to visualise ths metrics using observability. All log messeges with metrics included always starts with @metrics metric_type count: number I want to extract this number to another fields which will be interprated by elastic/kibana as metrics.
Thanks, for help!

My current config:

input {
  http {
    port => 19200
    user => "elastic"
    ssl => true
	#ssl.certificate_authorities => ["C:\elk\logstash-8.7.0\config\certs\ca.cer"]
    ssl_certificate => "C:\elk\logstash-8.7.0\config\certs\cert.crt"
    ssl_key => "C:\elk\logstash-8.7.0\config\certs\key.key"
	#ssl_verify_mode => "force_peer"
    password => "pass"
    additional_codecs => {
      "application/x-ndjson" => "es_bulk"
    }
  }
}

filter {

  grok {
    match => {
      "message" => [
        "%{DATA:startTitle} - START",
        "%{DATA:stopTitle} - STOP",
        "%{DATA:stopTitle} - END",

        "%{DATA:startTitle} execution started",
        "%{DATA:stopTitle} execution ended",
        "%{DATA:startTitle} started",
        "%{DATA:stopTitle} ended"
      ]
    }
  }

  if [startTitle] =~ ".+" {
    aggregate {
      task_id => "%{processName}"
      code => "map[event.get('startTitle')] = event.get('timeStamp')"
      timeout => 18000
    }
  }

  if [stopTitle] =~ ".+" {
    aggregate {
      task_id => "%{processName}"
      code => "event.set('blockStartTimeStamp', map[event.get('stopTitle')]); event.set('blockStopTimeStamp', event.get('timeStamp')); map.delete(event.get('stopTitle'))"
    }

    if [blockStartTimeStamp] =~".+" and  [blockStopTimeStamp] =~".+" {
      ruby {
        init => "require 'time'"
        code => "event.set('blockDuration', Time.parse(event.get('blockStopTimeStamp')) - Time.parse(event.get('blockStartTimeStamp')))"
      }
    } else {

    }
  }

}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "%{[@metadata][codec][es_bulk][_index]}"
    document_id => "%{[@metadata][codec][es_bulk][_id]}"
    user => "elastic"
    password => "pass"
  }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.