Multilevel split filter in logstash

Hi everyone,

My original raw output for a custom API is like bellow

> '[{"sourceSystem":"D365","totalCasesReceived":4,"success":{"count":1,"cases":[{"transactionId":"D0000001775-7","correlationId":"d1c755de-5a03-4dde-adb5-2151c29239f3","source":"D365","serviceEnv":"D","caseNumber":"D0000001775","dateReceived":"2025-02-05T10:00:05.123+00:00","dateSentToTrackWise":"2025-02-05T10:00:06.621+00:00","prId":"3943802","prIdReceived":"2025-02-05T10:04:04.001+00:00","prIdStatus":true,"message":"Successfully Updated TW PR ID"}]},"failedWhileSendingToTW":{"count":0,"cases":[]},"dataValidationError":{"count":0,"cases":[]},"businessAckNotReceived":{"count":3,"cases":[{"transactionId":"D0000001830-13","correlationId":"61aac7a8-303a-495f-b19d-9b7af842f1d4","source":"D365","serviceEnv":"D","caseNumber":"D0000001830","dateReceived":"2025-02-04T09:51:13.499+00:00","dateSentToTrackWise":"2025-02-04T09:51:19.162+00:00","prId":null,"prIdReceived":null,"prIdStatus":false,"message":null},{"transactionId":"D0000001830-13","correlationId":"096ee225-27df-4eb4-89a5-6182463ffb14","source":"D365","serviceEnv":"D","caseNumber":"D0000001830","dateReceived":"2025-02-04T09:51:13.491+00:00","dateSentToTrackWise":"2025-02-04T09:51:20.057+00:00","prId":null,"prIdReceived":null,"prIdStatus":false,"message":null},{"transactionId":"D0000001830-13","correlationId":"890aa054-5d2a-4179-81f2-3dc2a5fc0f5c","source":"D365","serviceEnv":"D","caseNumber":"D0000001830","dateReceived":"2025-02-04T09:54:44.723+00:00","dateSentToTrackWise":"2025-02-04T09:54:58.038+00:00","prId":null,"prIdReceived":null,"prIdStatus":false,"message":null}]}}]'

where in this output there are success.cases , failedWhileSendingToTW.cases , dataValidationError.cases, businessAckNotReceived.cases four internal nested arrays. i wanted to extract them as well for better dashboarding and alerting.

our main goal was to show counts of each cases in total and also show which casenumber or which prid for respective cases if we have documents.

the solution would be applying split accordingly and we will get individual documents for same and we can use the same fields in dashboarding or alerting.

this is my current logstash pipeline filter part

##### Spliting logic # 1. Mutate to create separate standard fields for easier processing
     mutate { add_field => { "sourceSystem" => "%{[api2_response][0][sourceSystem]}" } }
     mutate { add_field => { "totalCasesReceived" => "%{[api2_response][0][totalCasesReceived]}" } }
     mutate { add_field => {  "failedWhileSendingToTWCount" => "%{[api2_response][0][failedWhileSendingToTW][count]}" } }
     mutate { add_field => {  "successCount" => "%{[api2_response][0][success][count]}" } }
     mutate { add_field => {  "dataValidationErrorCount" => "%{[api2_response][0][dataValidationError][count]}" } }
     mutate { add_field => {  "businessAckNotReceivedCount" => "%{[api2_response][0][businessAckNotReceived][count]}" } }

     mutate { add_field => {  "failedWhileSendingToTWCases" => "{%{[api2_response][0][failedWhileSendingToTW][cases]}}" } }
     mutate { add_field => {  "successCases" => "{%{[api2_response][0][success][cases]}}" } }
     mutate { add_field => {  "dataValidationErrorCases" => "{%{[api2_response][0][dataValidationError][cases]}}" } }
     mutate { add_field => {  "businessAckNotReceivedCases" => "{%{[api2_response][0][businessAckNotReceived][cases]}}" } } 
mutate { remove_field => [ "access_token", "token_response"] }

this is stdout output from logstash

> {
       "dataValidationErrorCount" => "0",
    "businessAckNotReceivedCount" => "3",
                   "successCases" => "{{caseNumber=D0000001775, dateReceived=2025-02-05T10:00:05.123+00:00, serviceEnv=D, prId=3943802, correlationId=d1c755de-5a03-4dde-adb5-2151c29239f3, source=D365, message=Successfully Updated TW PR ID, prIdReceived=2025-02-05T10:04:04.001+00:00, prIdStatus=true, dateSentToTrackWise=2025-02-05T10:00:06.621+00:00, transactionId=D0000001775-7}}",
                       "@version" => "1",
       "dataValidationErrorCases" => "{}",
                  "api2_response" => [
        [0] {
                      "sourceSystem" => "D365",
            "failedWhileSendingToTW" => {
                "cases" => [],
                "count" => 0
            },
                           "success" => {
                "cases" => [
                    [0] {
                                 "caseNumber" => "D0000001775",
                                    "message" => "Successfully Updated TW PR ID",
                               "prIdReceived" => "2025-02-05T10:04:04.001+00:00",
                                     "source" => "D365",
                              "correlationId" => "d1c755de-5a03-4dde-adb5-2151c29239f3",
                               "dateReceived" => "2025-02-05T10:00:05.123+00:00",
                                 "prIdStatus" => true,
                                 "serviceEnv" => "D",
                                       "prId" => "3943802",
                        "dateSentToTrackWise" => "2025-02-05T10:00:06.621+00:00",
                              "transactionId" => "D0000001775-7"
                    }
                ],
                "count" => 1
            },
            "businessAckNotReceived" => {
                "cases" => [
                    [0] {
                                 "caseNumber" => "D0000001830",
                                    "message" => nil,
                               "prIdReceived" => nil,
                                     "source" => "D365",
                              "correlationId" => "61aac7a8-303a-495f-b19d-9b7af842f1d4",
                               "dateReceived" => "2025-02-04T09:51:13.499+00:00",
                                 "prIdStatus" => false,
                                 "serviceEnv" => "D",
                                       "prId" => nil,
                        "dateSentToTrackWise" => "2025-02-04T09:51:19.162+00:00",
                              "transactionId" => "D0000001830-13"
                    },
                    [1] {
                                 "caseNumber" => "D0000001830",
                                    "message" => nil,
                               "prIdReceived" => nil,
                                     "source" => "D365",
                              "correlationId" => "096ee225-27df-4eb4-89a5-6182463ffb14",
                               "dateReceived" => "2025-02-04T09:51:13.491+00:00",
                                 "prIdStatus" => false,
                                 "serviceEnv" => "D",
                                       "prId" => nil,
                        "dateSentToTrackWise" => "2025-02-04T09:51:20.057+00:00",
                              "transactionId" => "D0000001830-13"
                    },
                    [2] {
                                 "caseNumber" => "D0000001830",
                                    "message" => nil,
                               "prIdReceived" => nil,
                                     "source" => "D365",
                              "correlationId" => "890aa054-5d2a-4179-81f2-3dc2a5fc0f5c",
                               "dateReceived" => "2025-02-04T09:54:44.723+00:00",
                                 "prIdStatus" => false,
                                 "serviceEnv" => "D",
                                       "prId" => nil,
                        "dateSentToTrackWise" => "2025-02-04T09:54:58.038+00:00",
                              "transactionId" => "D0000001830-13"
                    }
                ],
                "count" => 3
            },
               "dataValidationError" => {
                "cases" => [],
                "count" => 0
            },
                "totalCasesReceived" => 4
        }
    ],
    "failedWhileSendingToTWCases" => "{}",
                     "@timestamp" => 2025-02-05T22:36:16.792587Z,
                        "to_time" => "05-02-2025 22:36:16",
             "totalCasesReceived" => "4",
                   "sourceSystem" => "D365",
    "failedWhileSendingToTWCount" => "0",
    "businessAckNotReceivedCases" => "{{caseNumber=D0000001830, dateReceived=2025-02-04T09:51:13.499+00:00, serviceEnv=D, prId=null, correlationId=61aac7a8-303a-495f-b19d-9b7af842f1d4, source=D365, message=null, prIdReceived=null, prIdStatus=false, dateSentToTrackWise=2025-02-04T09:51:19.162+00:00, transactionId=D0000001830-13},{caseNumber=D0000001830, dateReceived=2025-02-04T09:51:13.491+00:00, serviceEnv=D, prId=null, correlationId=096ee225-27df-4eb4-89a5-6182463ffb14, source=D365, message=null, prIdReceived=null, prIdStatus=false, dateSentToTrackWise=2025-02-04T09:51:20.057+00:00, transactionId=D0000001830-13},{caseNumber=D0000001830, dateReceived=2025-02-04T09:54:44.723+00:00, serviceEnv=D, prId=null, correlationId=890aa054-5d2a-4179-81f2-3dc2a5fc0f5c, source=D365, message=null, prIdReceived=null, prIdStatus=false, dateSentToTrackWise=2025-02-04T09:54:58.038+00:00, transactionId=D0000001830-13}}",
                   "successCount" => "1",
                      "from_time" => "29-01-2025 22:36:16"
}

i tried couple of things before i settle down with this , my current output is still not enough for requrement , i need help to effectivly split the mentioed 4 cases

try 1

  # Split the main `api2_response` array first
  split {
    field => "api2_response"
    target => "out"
  }

  split {
         field => "[out][businessAckNotReceived][cases]"
         target => "[businessAckNotReceived][cases]"
        }
  split {
         field => "[out][success][cases]"
         target => "[success][cases]"
        }

in this try 1 , the fields with businessAckNotReceived case is getting splited as expected but the success cases are not working

Try 2 : used if loops

####

    split { field => "api2_response" }


### Spliting logic

  if [api2_response][businessAckNotReceived][cases] {
    split {
      field => "[api2_response][businessAckNotReceived][cases]"
      target => "[businessAckNotReceived][cases]"
    }
  }
  else if [api2_response][failedWhileSendingToTW][cases] {
    split {
      field => "[api2_response][failedWhileSendingToTW][cases]"
      target => "[failedWhileSendingToTW][cases]"
    }
  }
  else if [api2_response][success][cases] {
    split {
      field => "[api2_response][success][cases]"
      target => "[success][cases]"
    }
  }
  else if [api2_response][dataValidationError][cases] {
    split {
      field => "[api2_response][dataValidationError][cases]"
      target => "[dataValidationError][cases]"
    }
  }

this is also same , we are not getting proper response

Try :3


 if [api2_response][businessAckNotReceived][cases] {
    split {
      field => "[api2_response][businessAckNotReceived][cases]"
      target => "[api2_response][businessAckNotReceived][case]"
    }
    mutate {
      remove_field => ["[api2_response][businessAckNotReceived][cases]"]
    }
  }

  # Split `failedWhileSendingToTW.cases` into separate events
  if [api2_response][failedWhileSendingToTW][cases] {
    split {
      field => "[api2_response][failedWhileSendingToTW][cases]"
      target => "[api2_response][failedWhileSendingToTW][case]"
    }
    mutate {
      remove_field => ["[api2_response][failedWhileSendingToTW][cases]"]
    }
  }

  # Split `success.cases` into separate events
  if [api2_response][success][cases] {
    split {
      field => "[api2_response][success][cases]"
      target => "[api2_response][success][case]"
    }
    mutate {
      remove_field => ["[api2_response][success][cases]"]
    }
  }
  # Split `dataValidationError.cases` into separate events
  if [api2_response][dataValidationError][cases] {
    split {
      field => "[api2_response][dataValidationError][cases]"
      target => "[api2_response][dataValidationError][case]"
    }
    mutate {
      remove_field => ["[api2_response][dataValidationError][cases]"]
    }
  }

in above tries , somewhere i notice , the 1st split was working and the latter cases getting copied in each event which got splited in 1st go and then the split is happening again - noticed duplicates

also tried ruby but not working as expected and as ruby cannot produce new documents (as per my understanding) i didnt go further.

please help me to resolve and achieve this

Are you saying that you want each failedWhileSendingToTW / success / businessAckNotReceived / dataValidationError response in a separate event?

Just using split will result in three events for businessAckNotReceived, but each of them will also have failedWhileSendingToTW (and empty failedWhileSendingToTW / dataValidationError objects).

You could try something like

    json { source => "message" target => "[@metadata][response]" remove_field => [ "message" ] }
    ruby {
        code => '
            cases = []
            [ "dataValidationError", "failedWhileSendingToTW",
              "success", "businessAckNotReceived" ].each { |x|
                a = event.remove("[@metadata][response][0][#{x}][cases]")
                a.each { |aCase|
                    aCase["type"] = x
                    cases << aCase
                }
            }
            event.set("[case]", cases)
        '
    }
    split { field => "[case]" }

but there are many other ways to structure the data.