Extract some data from an event and send it to a different index


(Andy Foster) #1

I have events that look something like this.

{
  "_index": "my-index",
  "_type": "doc",
  "_id": "ebfc6jb3fud8gjml2jq9g",
  "_version": 1,
  "_score": null,
  "_source": {
    "@timestamp": "2018-10-30T14:24:48.670Z",
    "kafka-timestamp": 1540909488693,
    "@version": "1",
      "created": 1540909488670,
      "id": "ebfc6jb3fud8gjml2jq9g",
      "status": "active",
      "event-params": {
        "OID": "obfc6jbbfud8gjml2jqa0",
        "RID": "ubfc6gbrfud8gjml2jq8g",
        "SCODE": 3,
        "FCOUNT": "15/15"
      },
      "data": {
        "successFileCount": "15/15",
        "timestamp": 1540909488651,
        "results": {
          "object": {
            "success": "true",
            "parent-collection-id": "cbfbloobfud8gjml2jpn0",
            "object-created": "obfc6jbbfud8gjml2jqa0"
          },
          "files": {
            "binary": [
              {
                "success": "true",
                "file-properties": {
                  "size": 29270640,
                  "format": {
                    "format": "Audio Interchange File Format (compressed)",
                    "ns": "pronom",
                    "mime": "audio/x-aiff",
                    "version": "",
                    "basis": "byte match at 0, 12",
                    "warning": "extension mismatch",
                    "id": "x-fmt/136"
                  },
                  "hash": {
                    "md5": "ee67263df6e7077fb0521ce24b83b153",
                    "sha256": "bf7596450788f0117cb16094a48aba0bb1d67fc480ddffb2b604a89e4ecb92c9"
                  },
                  "storage": {
                    "object": {
                      "file_name": "5de29611-25ab-4a57-b050-a4f3e8a4b718_10 Audio Track.aiff"
                    },
                    "block": false
                  },
                  "provider": {
                    "provider-id": "pbfbkn5rfud8gjml2jplg",
                    "provider-description": ""
                  },
                  "receiptId": "ubfc6gbrfud8gjml2jq8g"
                },
                "parent-object": "obfc6jbbfud8gjml2jqa0",
                "file-created": "fbfc6jc3fud8gjml2jqh0",
                "relative-path": "/December 8, 2017 audio/10 Audio Track.aiff"
              },
              {
                "success": "true",
                "file-properties": {
                  "size": 28612080,
                  "format": {
                    "format": "Audio Interchange File Format (compressed)",
                    "ns": "pronom",
                    "mime": "audio/x-aiff",
                    "version": "",
                    "basis": "byte match at 0, 12",
                    "warning": "extension mismatch",
                    "id": "x-fmt/136"
                  },
                  "hash": {
                    "md5": "e94cd5ad668761d8625c6b7f9f9d85e4",
                    "sha256": "e485cc8c0f7ff293f039f1ebb3bc90eeeedc6747618e446782c1ff1e63a860da"
                  },
                  "storage": {
                    "object": {
                      "file_name": "bec1360d-4020-4bbf-8302-1afb27ef7844_11 Audio Track.aiff"
                    },
                    "block": false
                  },
                  "provider": {
                    "provider-id": "pbfbkn5rfud8gjml2jplg",
                    "provider-description": ""
                  },
                  "receiptId": "ubfc6gbrfud8gjml2jq8g"
                },
                "parent-object": "obfc6jbbfud8gjml2jqa0",
                "file-created": "fbfc6jc3fud8gjml2jqd0",
                "relative-path": "/December 8, 2017 audio/11 Audio Track.aiff"
              },
              {
                "success": "true",
                "file-properties": {
                  "size": 50859648,
                  "format": {
                    "format": "Audio Interchange File Format (compressed)",
                    "ns": "pronom",
                    "mime": "audio/x-aiff",
                    "version": "",
                    "basis": "byte match at 0, 12",
                    "warning": "extension mismatch",
                    "id": "x-fmt/136"
                  },
                  "hash": {
                    "md5": "b70ad2c3ad7dd3b89fce98f6b6ba32d2",
                    "sha256": "3a599d9cef52297b01886c106cf0efc52146bcdf7a9db9cd6780f860d372b174"
                  },
                  "storage": {
                    "object": {
                      "file_name": "4c4bb115-d3b9-4b91-8ed3-333daa38ad05_12 Audio Track.aiff"
                    },
                    "block": false
                  },
                  "provider": {
                    "provider-id": "pbfbkn5rfud8gjml2jplg",
                    "provider-description": ""
                  },
                  "receiptId": "ubfc6gbrfud8gjml2jq8g"
                },
                "parent-object": "obfc6jbbfud8gjml2jqa0",
                "file-created": "fbfc6jc3fud8gjml2jqb0",
                "relative-path": "/December 8, 2017 audio/12 Audio Track.aiff"
              }
          }
        },
        "statusCode": 3
      },
      "receiptID": "ubfc6gbrfud8gjml2jq8g",
      "eventType": 13
    }
  }
}

I am interested in getting the data.files.binary array of objects into a separate index with a document for each object.

I've looked into using the nested data type but Kibana has limitations with this.

I think my only option is to create two separate pipelines. The first would drop the array of objects entirely. The second would do the inverse with a prune filter, send the array through the split filter, then output to my specified index.

Does anyone know if I can achieve this with a single pipeline?


(Christian Dahlqvist) #2

You can use a clone filter to create an extra event, and then for that new event delete fields as needed before you apply a split filter on the data.files.binary field. You can then send these different events to different indices.


(Andy Foster) #3

Terrific, working great, thank you!


(system) #4

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.