Split my json input in logstash and push to ES

Hi, I have a gzipped json coming from kafka and I need to push it to ES after some transformations. With the help of this forum, I was able solve some of my problem. Right now i need to split the decompressed json into smaller ones and push to ES as individual documents. For example if my json is

{
  "contractId": "6f1c3b32-0bc0-45e8-b92d-2dce6ee98799",
  "version": "v4",
  "basketId": "1b600538-c734-4612-8b66-7b95116ad78c",
  "itemCount": 5,
  "items": [
    {
      "entityErrors": [],
      "id": "10001",
      "basketItemId": "66661",
      "lineItemNumber": "1",
      "quantity": 1,
      "price": "EA"
    },
    {
      "entityErrors": [],
      "id": "10002",
      "basketItemId": "66662",
      "lineNumber": "2",
      "quantity": 1,
      "price": "EA"
    },
    {
      "entityErrors": [],
      "id": "10003",
      "basketItemId": "66663",
      "lineItemNumber": "3",
      "quantity": 2,
      "price": "EA"
    }
  ],
  "payments": [
    {
      "id": "9c79fc79-9eb1-4f60-9417-835bdd82ace0",
      "payment": {
        "result": "SUCCESS",
        "version": "v1",
        "resultreason": "ZZZZ",
        "resultaction": "NOT_APPLICABLE",
        "clientreqid": "9994679EBB4",
        "card": "CREDITCARD.11111111"
      }
    }
  ]
}

I want to split the above json to 3 separate based on the array
json.items, like the below and push the individual documents in ES

Document 1 = array element 1 + common elements from root level

  {
    "entityErrors": [],
    "id": "10001",
    "basketItemId": "66661",
    "lineItemNumber": "1",
    "quantity": 1,
    "price": "EA",
    "contractId": "6f1c3b32-0bc0-45e8-b92d-2dce6ee98799",
    "version": "v4",
    "basketId": "1b600538-c734-4612-8b66-7b95116ad78c",
    "itemCount": 5
  },

Document 2 = array element 2 + common elements from root level

  {
    "entityErrors": [],
    "id": "10002",
    "basketItemId": "66662",
    "lineNumber": "2",
    "quantity": 1,
    "price": "EA",
    "contractId": "6f1c3b32-0bc0-45e8-b92d-2dce6ee98799",
    "version": "v4",
    "basketId": "1b600538-c734-4612-8b66-7b95116ad78c",
    "itemCount": 5
  }

Document 3 = array element 3 + common elements from root level

  {
    "entityErrors": [],
    "id": "10003",
    "basketItemId": "66663",
    "lineItemNumber": "3",
    "quantity": 2,
    "price": "EA",
    "contractId": "6f1c3b32-0bc0-45e8-b92d-2dce6ee98799",
    "version": "v4",
    "basketId": "1b600538-c734-4612-8b66-7b95116ad78c",
    "itemCount": 5
  }

The latest thing that I tried is the following but it isn't working.

filter {

    ## decompress from GZIP to readable text
    // ruby code to decompress here - works fine

    mutate {
        remove_field => ["[_source][event][original]", "[payload][compressedPayload]", "[event][original]"]
    }

    ## Convert the decompressed test to json format
    json {
        source => "decompressedJson"
        target => "json"
        add_tag => ["_message_json_parsed"]
    }

    ## Allow documents with stats SEND_TO_OMS only
    if [json][status] != "SEND_TO_OMS" {
      drop {}
    }

    mutate {
        remove_field => [ "decompressedJson" ]
        split => { field => "[json][items]" }
        add_field => { "contractId" => "[json][id]}" }
    }
}

output {
    elasticsearch {
        hosts => "elasticsearch-host:80"
        index => "json-stage"
    }
}

Try using

split {
    field => "[json][items]"
    remove_field => [ "decompressedJson", "[json][payments]" ]
}
ruby {
    code => '
        begin
            event.get("[json]").each { |k, v| event.set(k, v) }
            event.get("[json][items]").each { |k, v| event.set(k, v) }
        rescue
        end
    '
    remove_field => [ "items", "json" ]
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.