My multistage solution is as below. I used kibana_sample_data_logs
sample dataset and consider clientip
as fake 'user ID'. Selected new user for last day (not month).
Outline
- create 1st transform groupby
clientip
to get their first timestamp - create 2nd transform to aggregate the first timestamp to day buckets with scripted metric aggregation to retrieve unique
clientip
s as array. - query terms lookup to filter
clientip
s whose first timestamp is on a specific day.
Detail
1. create 1st transform groupby clientip
to get their first timestamp
PUT /_transform/test_kibana_sample_first_timestamp
{
"dest":{
"index": "test_kibana_sample_first_timestamp"
},
"pivot": {
"group_by": {
"clientip": {
"terms": {
"field": "clientip"
}
}
},
"aggregations": {
"first_timestamp": {
"min": {
"field": "timestamp"
}
}
}
},
"source": {
"index": "kibana_sample_data_logs"
}
}
POST _transform/test_kibana_sample_first_timestamp/_start
Then you get:
GET /test_kibana_sample_first_timestamp/_search?size=3&filter_path=hits.hits._source
{
"hits" : {
"hits" : [
{
"_source" : {
"first_timestamp" : "2022-01-24T07:51:57.333Z",
"clientip" : "0.72.176.46"
}
},
{
"_source" : {
"first_timestamp" : "2022-01-26T11:02:32.392Z",
"clientip" : "0.207.229.147"
}
},
{
"_source" : {
"first_timestamp" : "2022-01-27T10:55:29.114Z",
"clientip" : "0.209.144.101"
}
}
]
}
}
2. create 2nd transform to aggregate the first timestamp to day buckets with scripted metric aggregation to retrieve unique clientip
s as array.
First, create target index of the 2nd transform:
PUT /test_kibana_sample_first_timestamp_bucket/
{
"mappings": {
"properties": {
"day": {"type":"date"},
"new_ip": {"type": "ip"}
}
}
}
Then make ingest pipeline to set _id field:
PUT /_ingest/pipeline/test_set_id
{
"processors": [
{
"set": {
"field": "_id",
"value": "{{day}}"
}
}
]
}
Create 2nd transform and start.
I used scripted metric "unique values aggregation" .
PUT /_transform/test_kibana_sample_first_timestamp_bucket
{
"dest":{
"index": "test_kibana_sample_first_timestamp_bucket",
"pipeline": "test_set_id"
},
"pivot": {
"group_by": {
"day": {
"date_histogram": {
"field": "first_timestamp",
"calendar_interval": "day"
}
}
},
"aggregations": {
"new_ip": {
"scripted_metric": {
"init_script": "state.set = new HashSet()",
"map_script": "if (params['_source'].containsKey(params.field)) {state.set.add(params['_source'][params.field])}",
"combine_script": "return state.set",
"reduce_script": "def ret = new HashSet(); for (s in states) {for (k in s) {ret.add(k);}} return ret",
"params":{
"field": "clientip"
}
}
}
}
},
"source": {
"index": "test_kibana_sample_first_timestamp"
}
}
POST /_transform/test_kibana_sample_first_timestamp_bucket/_start
Then you get:
GET /test_kibana_sample_first_timestamp_bucket/_search?size=1&filter_path=hits.hits._id,hits.hits._source
{
"hits" : {
"hits" : [
{
"_id" : "2022-01-23T00:00:00.000Z",
"_source" : {
"day" : "2022-01-23T00:00:00.000Z",
"new_ip" : [
"19.253.238.55",
"42.17.43.107",
"81.32.253.92",
"91.183.212.113",
"240.58.187.246",
"1.5.239.89",...
]
}
}
]
}
}
3. query terms lookup to filter clientip
s whose first timestamp is on a specific day.
You can filter clientips whose first timestamp is on a specific day.
GET /kibana_sample_data_logs/_search
{
"query":{
"terms": {
"clientip": {
"index": "test_kibana_sample_first_timestamp_bucket",
"id": "2022-01-23T00:00:00.000Z",
"path": "new_ip"
}
}
}
}
This is a sample visualization with filtering by clientips whose first time stamp is on 2022-01-27.
You have to use Query DSL to use terms lookup query for filtering.