Elasticsearch Query with pattern and time sequence

I have some event logs in elasticsearch. Each log has timestamp, action and userId. I want to find the user whose logs have the time sequential condition. Here is some sample data.

+---------------------+---------+---------+
| timestamp           | action  | userId  |
+---------------------+---------+---------+
| 2019-10-14 08:34:03 |    A    | userA   |
+---------------------+---------+---------+
| 2019-10-14 11:39:26 |    A    | userB   |
+---------------------+---------+---------+
| 2019-10-14 11:41:23 |    A    | userA   |
+---------------------+---------+---------+
| 2019-10-14 12:10:56 |    B    | userB   |
+---------------------+---------+---------+
| 2019-10-14 12:51:42 |    B    | userA   |
+---------------------+---------+---------+
| 2019-10-14 15:23:48 |    C    | userA   |
+---------------------+---------+---------+
| 2019-10-14 16:05:19 |    B    | userB   |
+---------------------+---------+---------+
| 2019-10-14 19:44:01 |    A    | userC   |
+---------------------+---------+---------+
| 2019-10-15 03:18:15 |    D    | userA   |
+---------------------+---------+---------+

For example, condition is action: A -> action: B -> action:C then I should get userA. Is elasticsearch can do this ? If elasticsearch can't do this, is there any tool(bigquery or something else) or database can do this? Or it just needs multi query then writing some code to process by myself, or even this question needs machine learning ?

1 Like

You can answer this type of question with aggregations, or the new transforms feature in Elasticsearch (if the data is large and continuous).

For example,

PUT spans
{
  "mappings": {
    "properties": {
      "timestamp":    { "type": "date" },  
      "action":  { "type": "keyword"  }, 
      "userId":   { "type": "keyword"  }     
    }
  }
}

POST _bulk
{"index":{"_index":"spans"}}
{"timestamp":"2019-10-14T08:34:03","action":"A","userId":"userA"}
{"index":{"_index":"spans"}}
{"timestamp":"2019-10-14T11:39:26","action":"A","userId":"userB"}
{"index":{"_index":"spans"}}
{"timestamp":"2019-10-14T11:41:23","action":"A","userId":"userA"}
{"index":{"_index":"spans"}}
{"timestamp":"2019-10-14T12:10:56","action":"B","userId":"userB"}
{"index":{"_index":"spans"}}
{"timestamp":"2019-10-14T12:51:42","action":"B","userId":"userA"}
{"index":{"_index":"spans"}}
{"timestamp":"2019-10-14T15:23:48","action":"C","userId":"userA"}
{"index":{"_index":"spans"}}
{"timestamp":"2019-10-14T16:05:19","action":"B","userId":"userB"}
{"index":{"_index":"spans"}}
{"timestamp":"2019-10-14T19:44:01","action":"A","userId":"userC"}
{"index":{"_index":"spans"}}
{"timestamp":"2019-10-15T03:18:15","action":"D","userId":"userA"}

PUT _data_frame/transforms/transaction
{
  "source": {
    "index": [
      "spans"
    ]
  },
  "dest": {
    "index": "transactions"
  },
  "pivot": {
    "group_by": {
      "date": {
        "terms": {
          "field": "userId"
        }
      }
    },
    "aggregations": {
      "min_timestamp": {
        "min": {
          "field": "timestamp"
        }
      },
      "max_timestamp": {
        "max": {
          "field": "timestamp"
        }
      },
      "duration": {
        "bucket_script": {
          "buckets_path": {
            "max": "max_timestamp",
            "min": "min_timestamp"
          },
          "script": "params.max-params.min"
        }
      },
      "trace": {
        "scripted_metric": {
          "init_script": "state.spans = []",
          "map_script": """
            Map span = [
              'timestamp':doc['timestamp'].value, 
              'action':doc['action'].value
            ]; 
            state.spans.add(span)
          """,
          "combine_script": "return state.spans",
          "reduce_script": """
            def ret = []; 
            for (s in states) { 
              for (span in s) { 
                ret.add(span); 
              }
            }
            ret.sort((HashMap o1, HashMap o2)->o1['timestamp'].toInstant().toEpochMilli().compareTo(o2['timestamp'].toInstant().toEpochMilli())); 
            return ret;
            """
        }
      },
      "signature": {
        "scripted_metric": {
          "init_script": "state.spans = []",
          "map_script": """
            Map span = [
              'timestamp':doc['timestamp'].value, 
              'action':doc['action'].value
            ]; 
            state.spans.add(span)
          """,
          "combine_script": "return state.spans",
          "reduce_script": """
            def ret = []; 
            for (s in states) { 
              for (span in s) { 
                ret.add(span); 
              }
            }
            ret.sort((HashMap o1, HashMap o2)->o1['timestamp'].toInstant().toEpochMilli().compareTo(o2['timestamp'].toInstant().toEpochMilli()));
            def signature = "";
            for (span in ret) {
              signature += span['action']
            }
            return signature;
            """
        }
      }
    }
  }
}

POST _data_frame/transforms/transaction/_start

GET transactions/_search
{
  "query": {
    "wildcard": {
      "signature.keyword": "*ABCD*"
    }
  }
}

The transform, transforms the raw data to an entity centric index around user:

{
  "hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "transactions",
        "_type" : "_doc",
        "_id" : "dVXi9btFe7o12AMiSauzNKgAAAAAAAAA",
        "_score" : 1.0,
        "_source" : {
          "date" : "userA",
          "duration" : 6.7452E7,
          "trace" : [
            {
              "action" : "A",
              "timestamp" : "2019-10-14T08:34:03.000Z"
            },
            {
              "action" : "A",
              "timestamp" : "2019-10-14T11:41:23.000Z"
            },
            {
              "action" : "B",
              "timestamp" : "2019-10-14T12:51:42.000Z"
            },
            {
              "action" : "C",
              "timestamp" : "2019-10-14T15:23:48.000Z"
            },
            {
              "action" : "D",
              "timestamp" : "2019-10-15T03:18:15.000Z"
            }
          ],
          "signature" : "AABCD",
          "min_timestamp" : "2019-10-14T08:34:03.000Z",
          "max_timestamp" : "2019-10-15T03:18:15.000Z"
        }
      },
      ...
}

A really useful feature is that this transforms can be continuous (i.e. the transactions index will be automatically updated when new items are added to spans.

2 Likes

Thank you, It really helpful, but I got another problem, I executed your code and it works fine at the first time.In the end of your reply:

So I checked the document and add 'sync' 'frequency' in request body in the transform.
Here is the transform

GET _data_frame/transforms/transaction

Response

{
  "count" : 1,
  "transforms" : [
    {
      "id" : "transaction",
      "source" : {
        "index" : [
          "spans"
        ],
        "query" : {
          "match_all" : { }
        }
      },
      "dest" : {
        "index" : "transactions"
      },
      "frequency" : "1m",
      "sync" : {
        "time" : {
          "field" : "timestamp",
          "delay" : "120s"
        }
      },
      "pivot" : {
        "group_by" : {
          "date" : {
            "terms" : {
              "field" : "userId"
            }
          }
        },
        "aggregations" : {
          "min_timestamp" : {
            "min" : {
              "field" : "timestamp"
            }
          },
          "max_timestamp" : {
            "max" : {
              "field" : "timestamp"
            }
          },
          "duration" : {
            "bucket_script" : {
              "buckets_path" : {
                "max" : "max_timestamp",
                "min" : "min_timestamp"
              },
              "script" : "params.max-params.min"
            }
          },
          "trace" : {
            "scripted_metric" : {
              "init_script" : "state.spans = []",
              "map_script" : """
            Map span = [
              'timestamp':doc['timestamp'].value, 
              'action':doc['action'].value
            ]; 
            state.spans.add(span)
""",
              "combine_script" : "return state.spans",
              "reduce_script" : """
            def ret = []; 
            for (s in states) { 
              for (span in s) { 
                ret.add(span); 
              }
            }
            ret.sort((HashMap o1, HashMap o2)->o1['timestamp'].toInstant().toEpochMilli().compareTo(o2['timestamp'].toInstant().toEpochMilli())); 
            return ret;
"""
            }
          },
          "signature" : {
            "scripted_metric" : {
              "init_script" : "state.spans = []",
              "map_script" : """
            Map span = [
              'timestamp':doc['timestamp'].value, 
              'action':doc['action'].value
            ]; 
            state.spans.add(span)
""",
              "combine_script" : "return state.spans",
              "reduce_script" : """
            def ret = []; 
            for (s in states) { 
              for (span in s) { 
                ret.add(span); 
              }
            }
            ret.sort((HashMap o1, HashMap o2)->o1['timestamp'].toInstant().toEpochMilli().compareTo(o2['timestamp'].toInstant().toEpochMilli()));
            def signature = "";
            for (span in ret) {
              signature += span['action']
            }
            return signature;
"""
            }
          }
        }
      },
      "version" : "7.3.1",
      "create_time" : 1571708308388
    }
  ]
}

Then I start again, it still work at the first time., but transactions won't update when new items add in spans.
Did I miss something?

I found why the transform not work, or it should say not triggered.
I use bulk api to input some new spans but the timestamp is not in right time interval.

The new span's timestamp should in the interval between the current time(last checkpoint) to the sync.time.delay(last current + sync.time.delay , i think so).
If any span(event) inputs with right time interval, then the transform will be triggered then start to transform those haven't been processed.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.