Hi, I have a gzipped json coming from kafka and I need to push it to ES after some transformations. With the help of this forum, I was able solve some of my problem. Right now i need to split the decompressed json into smaller ones and push to ES as individual documents. For example if my json is
{
"contractId": "6f1c3b32-0bc0-45e8-b92d-2dce6ee98799",
"version": "v4",
"basketId": "1b600538-c734-4612-8b66-7b95116ad78c",
"itemCount": 5,
"items": [
{
"entityErrors": [],
"id": "10001",
"basketItemId": "66661",
"lineItemNumber": "1",
"quantity": 1,
"price": "EA"
},
{
"entityErrors": [],
"id": "10002",
"basketItemId": "66662",
"lineNumber": "2",
"quantity": 1,
"price": "EA"
},
{
"entityErrors": [],
"id": "10003",
"basketItemId": "66663",
"lineItemNumber": "3",
"quantity": 2,
"price": "EA"
}
],
"payments": [
{
"id": "9c79fc79-9eb1-4f60-9417-835bdd82ace0",
"payment": {
"result": "SUCCESS",
"version": "v1",
"resultreason": "ZZZZ",
"resultaction": "NOT_APPLICABLE",
"clientreqid": "9994679EBB4",
"card": "CREDITCARD.11111111"
}
}
]
}
I want to split the above json to 3 separate based on the array
json.items, like the below and push the individual documents in ES
Document 1 = array element 1 + common elements from root level
{
"entityErrors": [],
"id": "10001",
"basketItemId": "66661",
"lineItemNumber": "1",
"quantity": 1,
"price": "EA",
"contractId": "6f1c3b32-0bc0-45e8-b92d-2dce6ee98799",
"version": "v4",
"basketId": "1b600538-c734-4612-8b66-7b95116ad78c",
"itemCount": 5
},
Document 2 = array element 2 + common elements from root level
{
"entityErrors": [],
"id": "10002",
"basketItemId": "66662",
"lineNumber": "2",
"quantity": 1,
"price": "EA",
"contractId": "6f1c3b32-0bc0-45e8-b92d-2dce6ee98799",
"version": "v4",
"basketId": "1b600538-c734-4612-8b66-7b95116ad78c",
"itemCount": 5
}
Document 3 = array element 3 + common elements from root level
{
"entityErrors": [],
"id": "10003",
"basketItemId": "66663",
"lineItemNumber": "3",
"quantity": 2,
"price": "EA",
"contractId": "6f1c3b32-0bc0-45e8-b92d-2dce6ee98799",
"version": "v4",
"basketId": "1b600538-c734-4612-8b66-7b95116ad78c",
"itemCount": 5
}
The latest thing that I tried is the following but it isn't working.
filter {
## decompress from GZIP to readable text
// ruby code to decompress here - works fine
mutate {
remove_field => ["[_source][event][original]", "[payload][compressedPayload]", "[event][original]"]
}
## Convert the decompressed test to json format
json {
source => "decompressedJson"
target => "json"
add_tag => ["_message_json_parsed"]
}
## Allow documents with stats SEND_TO_OMS only
if [json][status] != "SEND_TO_OMS" {
drop {}
}
mutate {
remove_field => [ "decompressedJson" ]
split => { field => "[json][items]" }
add_field => { "contractId" => "[json][id]}" }
}
}
output {
elasticsearch {
hosts => "elasticsearch-host:80"
index => "json-stage"
}
}