Combine connect and disconnect documents with transform?

Hello!

I'm new to transforms so not really sure, but I think transforms is what I need here.

Logs are pushed from web application to ES (fluentd) and there among connect and disconnect messages. What I would want is to combine them into a single "session" log.

Example

{
  "user.name": "donald",
  "organization.name": "orgA",
  "event": "connected",
  "connectionID": "8adca1fb",
  "timestamp": "2024-12-27T08:00:00.000Z"
},
{
  "user.name": "donald",
  "organization.name": "orgA",
  "event": "disconnected",
  "connectionID": "8adca1fb",
  "timestamp": "2024-12-27T09:00:00.000Z"
}

Add new document if connected, but on disconnect it should update the document based on user name, org and connection ID. Optionally also add duration and other session information.

Is that possible?

The new index could have following base fields, I will probably add more later like device info and so on.

{
  "user.name": "donald",
  "organization.name": "orgA",
  "connectionID": "8adca1fb",
  "timestampStart": "2024-12-27T08:00:00.000Z",
  "timestampEnd": "2024-12-27T09:00:00.000Z",
  "duration": 3600.0
}

So any ongoing sessions will not have timestampEnd and duration.

Then later I hopefully could show them visually in a GANTT chart.

This should be possible given your document structure!

Here's an example configuration:

POST _transform/_preview
{
  "source": {
    "index": [
      "transform_transaction_source"
    ]
  },
  "pivot": {
    "group_by": {
      "id": {
        "terms": {
          "field": "connectionID.keyword"
        }
      }
    },
    "aggregations": {
      "connected": {
        "filter": {
          "term": {
            "event.keyword": "connected"
          }
        },
        "aggs": {
          "start": {
            "min": {
              "field": "timestamp"
            }
          }
        }
      },
      "disconnected": {
        "filter": {
          "term": {
            "event.keyword": "disconnected"
          }
        },
        "aggs": {
          "end": {
            "max": {
              "field": "timestamp"
            }
          }
        }
      },
      "diff": {
          "bucket_script": {
            "buckets_path": {
              "connected": "connected>start",
              "disconnected": "disconnected>end"
            },
            "script": "params.disconnected - params.connected"
          }
        }
    }
  }
}

It groups by the unique ids, then uses nested aggregations to identify the start and end timestamps. Using a bucket_script aggregation we can calculate the duration. Besides the bucket script this can be built using the Transform wizard UI in the Kibana management section. I used the the "copy-to-clipboard" option to copy the config to Kibana Dev Tools to add the bucket_script agg. Here's the output of that preview call:

{
  "preview": [
    {
      "connected": {
        "start": "2024-12-27T08:00:00.000Z"
      },
      "disconnected": {
        "end": "2024-12-27T09:00:00.000Z"
      },
      "diff": 3600000,
      "id": "8adca1fb"
    }
  ],
  "generated_dest_index": {
    "mappings": {
      "_meta": {
        "_transform": {
          "transform": "transform-preview",
          "version": {
            "created": "10.0.0"
          },
          "creation_date_in_millis": 1735312462757
        },
        "created_by": "transform"
      },
      "properties": {
        "disconnected.end": {
          "type": "date"
        },
        "connected": {
          "type": "object"
        },
        "disconnected": {
          "type": "object"
        },
        "connected.start": {
          "type": "date"
        },
        "id": {
          "type": "keyword"
        }
      }
    },
    "settings": {
      "index": {
        "number_of_shards": "1",
        "auto_expand_replicas": "0-1"
      }
    },
    "aliases": {}
  }
}
2 Likes

Great answer from @walterra

Not sure if this is an issue with the _transform APIs in 8.x, but I faced a similar issue a few years ago, 5.x or 6.x time, dont think transform APIs were an option, and in end had to use a different tool.

If the cardinality of the user.,name/orgId/connectionId gets "big", and in my case it was in the hundreds of thousands per hour, then I would worry that things get too slow/expensive. In my case there was also plenty of dirty data, e.g. connections missing disconnections, duplicate log entries, etc. There's also complication as session durations could span multiple days.

Maybe none of that would be an issue in your case.

I tweaked the code a little bit, using my at-indexing-time eventIngested field to control the running of the _transform, maybe it also helps

PUT _transform/sort_connections
{
  "source": {
    "index": [
      "connection-2024-12-27"
    ]
  },
  "sync": {
    "time": {
      "field": "eventIngested",
      "delay": "10s"
    }
  },
  "dest" : { 
    "index" : "complete-connection-2024-12-27"
  },
  "pivot": {
    "group_by": {
      "id1": {
        "terms": {
          "field": "connectionID.keyword"
        }
      },
      "id2": {
        "terms": {
          "field": "user.name.keyword"
        }
      }
    },
    "aggregations": {
      "connected": {
        "filter": {
          "term": {
            "event.keyword": "connected"
          }
        },
        "aggs": {
          "start": {
            "min": {
              "field": "timestamp"
            }
          }
        }
      },
      "disconnected": {
        "filter": {
          "term": {
            "event.keyword": "disconnected"
          }
        },
        "aggs": {
          "end": {
            "max": {
              "field": "timestamp"
            }
          }
        }
      },
      "diff": {
          "bucket_script": {
            "buckets_path": {
              "connected": "connected>start",
              "disconnected": "disconnected>end"
            },
            "script": "params.disconnected - params.connected"
          }
        }
    }
  }
}

which gave me this view on some additional dummy data:

dummy data:

PUT /connection-2024-12-27/_bulk?pipeline=pipeline_add_ingest_timestamp
{ "create":{ } }
{ "user.name": "donald", "organization.name": "orgA", "event": "connected", "connectionID": "conn1", "timestamp": "2024-12-27T08:00:00.000Z" }
{ "create":{ } }
{ "user.name": "jon", "organization.name": "orgA", "event": "connected", "connectionID": "conn2", "timestamp": "2024-12-27T09:00:00.000Z" }
{ "create":{ } }
{ "user.name": "peter", "organization.name": "orgA", "event": "connected", "connectionID": "conn3", "timestamp": "2024-12-27T10:00:00.000Z" }
{ "create":{ } }
{ "user.name": "paul", "organization.name": "orgA", "event": "connected", "connectionID": "conn4", "timestamp": "2024-12-27T11:00:00.000Z" }
{ "create":{ } }
{ "user.name": "mike", "organization.name": "orgA", "event": "connected", "connectionID": "conn6", "timestamp": "2024-12-27T13:00:00.000Z" }
{ "create":{ } }
{ "user.name": "mike", "organization.name": "orgA", "event": "disconnected", "connectionID": "conn6", "timestamp": "2024-12-27T14:00:00.000Z" }
{ "create":{ } }
{ "user.name": "donald", "organization.name": "orgA", "event": "disconnected", "connectionID": "conn1", "timestamp": "2024-12-27T15:30:00.000Z" }
{ "create":{ } }
{ "user.name": "peter", "organization.name": "orgA", "event": "disconnected", "connectionID": "conn3", "timestamp": "2024-12-27T16:20:00.000Z" }
{ "create":{ } }
{ "user.name": "paul", "organization.name": "orgA", "event": "disconnected", "connectionID": "conn4", "timestamp": "2024-12-27T17:15:00.000Z" }
{ "create":{ } }
{ "user.name": "donald", "organization.name": "orgA", "event": "connected", "connectionID": "conn7", "timestamp": "2024-12-27T18:00:00.000Z" }
{ "create":{ } }
{ "user.name": "donald", "organization.name": "orgA", "event": "disconnected", "connectionID": "conn7", "timestamp": "2024-12-27T19:15:00.000Z" }

PUT /connecttion-2024-12-27/_bulk?pipeline=pipeline_add_ingest_timestamp
{ "create":{ } }
{"user.name": "jon", "organization.name": "orgA", "event": "disconnected", "connectionID": "conn2", "timestamp": "2024-12-27T09:30:00.000Z" }
{ "create":{ } }
{"user.name": "andrew", "organization.name": "orgA", "event": "connected", "connectionID": "conn9", "timestamp": "2024-12-27T19:30:00.000Z" }
{ "create":{ } }
{"user.name": "andrew", "organization.name": "orgA", "event": "disconnected", "connectionID": "conn9", "timestamp": "2024-12-27T19:45:00.000Z" }
1 Like

Thank you! Works perfectly after I figured out how to use data url and map values to root.