Hi all,
recently I played a bit with the Transforms feature on Elasticsearch.
I started by creating this test continuous transform
{
"description": "Create summary transaction entities",
"frequency": "1m",
"source": {
"index": [
"myapp.logs"
],
"query": {
"bool": {
"must": [
{
"exists": {
"field": "transaction.id"
}
},
{
"range": {
"@timestamp": {
"gte": "now-15m",
"lte": "now"
}
}
}
]
}
}
},
"dest": {
"index": "myapp.transactions"
},
"sync": {
"time": {
"field": "@timestamp",
"delay": "60s"
}
},
"pivot": {
"group_by": {
"transaction.id": {
"terms": {
"field": "transaction.id"
}
}
},
"aggregations": {
"docs_count": {
"value_count": {
"field": "@timestamp"
}
},
"transaction.start": {
"min": {
"field": "@timestamp"
}
},
"transaction.end": {
"max": {
"field": "@timestamp"
}
},
"transaction.duration": {
"bucket_script": {
"buckets_path": {
"start": "transaction.start.value",
"end": "transaction.end.value"
},
"script": "(params.end - params.start)"
}
}
}
}
}
So I have a time-based index (myapp.logs) in which I store logs from an application. In particular, some of these logs are reporting informations about incoming HTTP requests and the related responses.
These are examples of source index documents:
[
{
"_id": "1",
"_index": "myapp.logs",
"_type": "_doc",
"_source": {
"@timestamp": "2020-10-21T09:00:00.000Z",
"service.name": "myServiceA",
"request.payload": "here some payload",
"type": "request",
"transaction.id": "abcd"
}
},
{
"_id": "2",
"_index": "myapp.logs",
"_type": "_doc",
"_source": {
"@timestamp": "2020-10-21T09:00:00.125Z",
"service.name": "myServiceA",
"response.payload": "here some payload",
"type": "response",
"transaction.id": "abcd"
}
},
{
"_id": "3",
"_index": "myapp.logs",
"_type": "_doc",
"_source": {
"@timestamp": "2020-10-21T09:00:00.050Z",
"service.name": "myServiceA",
"request.payload": "here some payload",
"type": "request",
"transaction.id": "defg"
}
},
{
"_id": "4",
"_index": "myapp.logs",
"_type": "_doc",
"_source": {
"@timestamp": "2020-10-21T09:00:00.375Z",
"service.name": "myServiceA",
"response.payload": "here some payload",
"type": "response",
"transaction.id": "defg"
}
}
]
As you can see, for each transaction (identified by the field transaction.id
) there is a pair of documents in the source index, one reporting the request and the other reporting the response (the two can be distinguished by field type
which has value "request" for the log of the request and "response" for the log of the response).
The transform, whose definition I posted above, should create a summary index, in which there is one document for each transaction.
The summary entity in the destination index should have the request timestamp (transaction.start
), the response timestamp (transaction.end
) and the duration of the transaction (transaction.duration
), calculated by doing the difference between the response and the request timestamps.
The result from running the transform in batch mode on the sample documents shown above would be something like this:
[
{
"_id": "99",
"_index": "myapp.transactions",
"_type": "_doc",
"_source": {
"docs_count": 2,
"transaction.id": "abcd",
"transaction.start": "2020-10-21T09:00:00.000Z",
"transaction.end": "2020-10-21T09:00:00.125Z",
"transaction.duration": 125
}
},
{
"_id": "100",
"_index": "myapp.transactions",
"_type": "_doc",
"_source": {
"docs_count": 2,
"transaction.id": "defg",
"transaction.start": "2020-10-21T09:00:00.050Z",
"transaction.end": "2020-10-21T09:00:00.375Z",
"transaction.duration": 325
}
}
]
I created the transform and so far it seems it is working.
However, since I'm continuously ingesting new documents in the source index, I want to be sure that the transform is not missing any updates.
Let me explain better.
I setted the frequency parameter to 1m.
This means that only the logs the have been ingested in the source index in the interval (now-1m,now), where "now" is the time at which the transform is executed, should create or update new entities in the destination index (suppose that the @timestamp field is also the exact time at which the document is ingested).
Suppose now that, when ES runs the transform and performs a new check to discover changes to the source index, for a given transaction (again identified by field transaction.id) it founds only the type=request log because the corresponding type=response log has not been ingested yet.
At this stage, the transform will create a new "transaction" entity in the destination index which will contain the results of aggregations done over only the type=request document (so for example, docs_count
will be equal to 1 and transaction.duration
to 0).
If I understood correctly the way transforms work, when the next cycle is run in which the type=response log is present in the source index, a new transaction entity (with the same value in field transaction.id) will be created but it will contains only aggregations done on the type=response document.
This is because the type=request document in the source index is outside of the time-window (now-1m,now) in which ES checks for changes.
Is this correct?
From the documentation on the Create Transform API it seems like I can set the parameter sync.time.delay
"such that it accounts for data transmission delays".
What does this mean exactly ?
The documentation here is not very clear.
If, for example, I set sync.time.delay
to 60s (like in the example transform definition above) does that mean that every time the transform is run ES awaits for updates to the source index that can occur after 60s from the transform execution?
In the Limitations section of the documentation regarding transforms instead I found this:
A continuous transform periodically checks for changed entities between the time since it last checked and now minus
sync.time.delay
. This time window moves without overlapping. If the timestamp of a recently indexed document falls within this time window but this document is not yet available to search then this entity will not be updated.
If this statement above is correct, it means that in my case if the request log and response log of a given transaction happen to "fall" inside two distinct transform time-windows, they will create two distinct transaction entities in the destination index and there is no possibility to tell ES to look in the past before the 1m interval set by parameter frequency.
So my question is, is there a way to customize the interval over which ES performs the check for new updates on the source index so that is different from the "frequency" interval (i.e. the interval at which each new transform cycle is run) ?