Finding the first occurrence of all Unique IDs within a term

My multistage solution is as below. I used kibana_sample_data_logs sample dataset and consider clientip as fake 'user ID'. Selected new user for last day (not month).

Outline

  1. create 1st transform groupby clientip to get their first timestamp
  2. create 2nd transform to aggregate the first timestamp to day buckets with scripted metric aggregation to retrieve unique clientips as array.
  3. query terms lookup to filter clientips whose first timestamp is on a specific day.

Detail

1. create 1st transform groupby clientip to get their first timestamp

PUT /_transform/test_kibana_sample_first_timestamp
{
  "dest":{
    "index": "test_kibana_sample_first_timestamp"
  },
  "pivot": {
    "group_by": {
      "clientip": {
        "terms": {
          "field": "clientip"
        }
      }
    },
    "aggregations": {
      "first_timestamp": {
        "min": {
          "field": "timestamp"
        }
      }
    }
  },
  "source": {
    "index": "kibana_sample_data_logs"
  }
}

POST _transform/test_kibana_sample_first_timestamp/_start

Then you get:

GET /test_kibana_sample_first_timestamp/_search?size=3&filter_path=hits.hits._source

{
  "hits" : {
    "hits" : [
      {
        "_source" : {
          "first_timestamp" : "2022-01-24T07:51:57.333Z",
          "clientip" : "0.72.176.46"
        }
      },
      {
        "_source" : {
          "first_timestamp" : "2022-01-26T11:02:32.392Z",
          "clientip" : "0.207.229.147"
        }
      },
      {
        "_source" : {
          "first_timestamp" : "2022-01-27T10:55:29.114Z",
          "clientip" : "0.209.144.101"
        }
      }
    ]
  }
}

2. create 2nd transform to aggregate the first timestamp to day buckets with scripted metric aggregation to retrieve unique clientips as array.

First, create target index of the 2nd transform:

PUT /test_kibana_sample_first_timestamp_bucket/
{
  "mappings": {
    "properties": {
      "day": {"type":"date"},
      "new_ip": {"type": "ip"}
    }
  }
}

Then make ingest pipeline to set _id field:

PUT /_ingest/pipeline/test_set_id
{
  "processors": [
    {
      "set": {
        "field": "_id",
        "value": "{{day}}"
      }
    }
  ]
}

Create 2nd transform and start.
I used scripted metric "unique values aggregation" .

PUT /_transform/test_kibana_sample_first_timestamp_bucket
{
  "dest":{
    "index": "test_kibana_sample_first_timestamp_bucket",
    "pipeline": "test_set_id"
  },
  "pivot": {
    "group_by": {
      "day": {
        "date_histogram": {
          "field": "first_timestamp",
          "calendar_interval": "day"
        }
      }
    },
    "aggregations": {
      "new_ip": {
        "scripted_metric": {
          "init_script": "state.set = new HashSet()",
          "map_script": "if (params['_source'].containsKey(params.field)) {state.set.add(params['_source'][params.field])}",
          "combine_script": "return state.set",
          "reduce_script": "def ret = new HashSet(); for (s in states) {for (k in s) {ret.add(k);}} return ret",
          "params":{
            "field": "clientip"
          }
        }
      }
    }
  },
  "source": {
    "index": "test_kibana_sample_first_timestamp"
  }
}

POST /_transform/test_kibana_sample_first_timestamp_bucket/_start

Then you get:

GET /test_kibana_sample_first_timestamp_bucket/_search?size=1&filter_path=hits.hits._id,hits.hits._source

{
  "hits" : {
    "hits" : [
      {
        "_id" : "2022-01-23T00:00:00.000Z",
        "_source" : {
          "day" : "2022-01-23T00:00:00.000Z",
          "new_ip" : [
            "19.253.238.55",
            "42.17.43.107",
            "81.32.253.92",
            "91.183.212.113",
            "240.58.187.246",
            "1.5.239.89",...
          ]
        }
      }
    ]
  }
}

3. query terms lookup to filter clientips whose first timestamp is on a specific day.

You can filter clientips whose first timestamp is on a specific day.

GET /kibana_sample_data_logs/_search
{
  "query":{
    "terms": {
      "clientip": {
        "index": "test_kibana_sample_first_timestamp_bucket",
        "id": "2022-01-23T00:00:00.000Z",
        "path": "new_ip"
      }
    }
  }
}

This is a sample visualization with filtering by clientips whose first time stamp is on 2022-01-27.

You have to use Query DSL to use terms lookup query for filtering.

1 Like