Logstash Split Message with Multiple Messages

Logfile I need to ingest. You can see there are 3 separate messages under alerts. We need to split that up into 3 separate messages.

{
   "@timestamp": "2023-05-23T18:15:30.537972Z",
   "alerts": [
      {
         "startsAt": "2023-05-22T19:47:45.055Z",
         "generatorURL": "REDACTED",
         "annotations": {
            "description": "REDACTED",
            "summary": "REDACTED"
         },
         "endsAt": "2023-05-23T18:14:15.055Z",
         "fingerprint": "REDACTED",
         "status": "resolved",
         "labels": {
            "severity": "info",
            "group": "REDACTED",
            "clusterName": "REDACTED",
            "managed_cluster": "REDACTED",
            "alertname": "REDACTED",
            "namespace": "REDACTED",
            "datacenter": "REDACTED",
            "resource": "REDACTED",
            "openshift_io_alert_source": "REDACTED",
            "version": "REDACTED",
            "prometheus": "REDACTED"
         }
      },
      {
         "startsAt": "2023-05-22T19:39:45.055Z",
         "generatorURL": "REDACTED",
         "annotations": {
            "description": "REDACTED",
            "summary": "REDACTED"
         },
         "endsAt": "0001-01-01T00:00:00Z",
         "fingerprint": "REDACTED",
         "status": "firing",
         "labels": {
            "severity": "info",
            "group": "REDACTED",
            "clusterName": "REDACTED",
            "managed_cluster": "REDACTED",
            "alertname": "REDACTED",
            "namespace": "REDACTED",
            "datacenter": "REDACTED",
            "resource": "REDACTED",
            "openshift_io_alert_source": "REDACTED",
            "version": "REDACTED",
            "prometheus": "REDACTED"
         }
      },
      {
         "startsAt": "2023-05-22T19:31:45.055Z",
         "generatorURL": "REDACTED",
         "annotations": {
            "description": "REDACTED",
            "summary": "REDACTED"
         },
         "endsAt": "0001-01-01T00:00:00Z",
         "fingerprint": "REDACTED",
         "status": "firing",
         "labels": {
            "severity": "info",
            "group": "REDACTED",
            "clusterName": "REDACTED",
            "managed_cluster": "REDACTED",
            "alertname": "REDACTED",
            "namespace": "REDACTED",
            "datacenter": "REDACTED",
            "resource": "REDACTED",
            "openshift_io_alert_source": "REDACTED",
            "version": "REDACTED",
            "prometheus": "REDACTED"
         }
      }
   ]
}

Here's my input, filter, and output.

input {
   file {
      path => "/test/data.json"
      type => "json"
      start_position => "beginning"
      sincedb_path => "/dev/null"
   }
}

filter {
  json {
    source => "message" 
    target => "parsed" 
  }

  split {
    field => "[parsed][alerts]" 
  }

  mutate {
    rename => { "[parsed][alerts]" => "alert" } 
    remove_field => ["message"]
    remove_field => ["[event][original]"]
  }
}

output {
   stdout{
   codec => rubydebug
   }
}

I'm getting a bunch of errors like this.

[2023-05-26T17:25:44,718][WARN ][logstash.filters.json    ][main][9c9e60c1f6a8ad74e552a52ec81721ad5ffba1fdfbaa56844f853c10d13b0ed7] Error parsing json {:source=>"message", :raw=>"         }", :exception=>#<LogStash::Json::ParserError: Unexpected close marker '}': expected ']' (for root starting at [Source: (byte[])"         }"; line: 1, column: 0])
 at [Source: (byte[])"         }"; line: 1, column: 11]>}
[2023-05-26T17:25:44,719][WARN ][logstash.filters.split   ][main][c9a83c2b41eab0270862dafa7c37852ab6af053c3a1faf4c320fe8cc1cbbcdc1] Only String and Array types are splittable. field:[parsed][alerts] is of type = NilClass

So in summary I want to pull everything from alerts.startsAt to alerts.labels.prometheus into a separate log for each instance of this.

Extra question: How would the mutate look to add a field in the new logs of datacenter (alerts.labels.datacenter) to the root of the log? I believe it's a mutate, but not sure how it would work.

How is your json in the file /test/data.json? It needs to be one json document per line, if you have a multiline json (or pretty printed) you will need to use a multiline codec in your input.

@balogan RE mutate filter

To add a field alerts.labels.datacenter using the mutate filter in Logstash, you can use the following configuration:

filter { mutate { add_field => { "alerts.labels.datacenter" => "your_value_here" } } }

Replace "your_value_here" with the desired value for the alerts.labels.datacenter field. This configuration will add the field to the event and assign the specified value to it.

If you want to assign the value dynamically based on another field, you can use the Logstash event field reference syntax. Here's an example:

filter { mutate { add_field => { "alerts.labels.datacenter" => "%{your_field_name}" } } }

Replace "your_field_name" with the actual name of the field that contains the value you want to assign to alerts.labels.datacenter. The %{} syntax allows you to reference the value of another field within Logstash.

Remember to adjust the rest of your Logstash pipeline configuration according to your requirements.

Just to add that nested fields need to be referenced using square brackets in logstash, so it needs to be [alerts][labels][datacenter], using dots will create fields with literal dots on it.

2 Likes