Understanding continous transforms syncing

Hi all,

recently I played a bit with the Transforms feature on Elasticsearch.

I started by creating this test continuous transform

{
	"description": "Create summary transaction entities",
	"frequency": "1m",
	"source": {
		"index": [
			"myapp.logs"
		],
		"query": {
			"bool": {
				"must": [
					{
						"exists": {
							"field": "transaction.id"
						}
					},
					{
						"range": {
							"@timestamp": {
								"gte": "now-15m",
								"lte": "now"
							}
						}
					}
				]
			}
		}
	},
	"dest": {
		"index": "myapp.transactions"
	},
	"sync": {
		"time": {
			"field": "@timestamp",
			"delay": "60s"
		}
	},
	"pivot": {
		"group_by": {
			"transaction.id": {
				"terms": {
					"field": "transaction.id"
				}
			}
		},
		"aggregations": {
			"docs_count": {
				"value_count": {
					"field": "@timestamp"
				}
			},
			"transaction.start": {
				"min": {
					"field": "@timestamp"
				}
			},
			"transaction.end": {
				"max": {
					"field": "@timestamp"
				}
			},
			"transaction.duration": {
				"bucket_script": {
					"buckets_path": {
						"start": "transaction.start.value",
						"end": "transaction.end.value"
					},
					"script": "(params.end - params.start)"
				}
			}
		}
	}
}

So I have a time-based index (myapp.logs) in which I store logs from an application. In particular, some of these logs are reporting informations about incoming HTTP requests and the related responses.
These are examples of source index documents:

[
	{
		"_id": "1",
		"_index": "myapp.logs",
		"_type": "_doc",
		"_source": {
			"@timestamp": "2020-10-21T09:00:00.000Z",
			"service.name": "myServiceA",
			"request.payload": "here some payload",
			"type": "request",
			"transaction.id": "abcd"
		}
	},
	{
		"_id": "2",
		"_index": "myapp.logs",
		"_type": "_doc",
		"_source": {
			"@timestamp": "2020-10-21T09:00:00.125Z",
			"service.name": "myServiceA",
			"response.payload": "here some payload",
			"type": "response",
			"transaction.id": "abcd"
		}
	},
	{
		"_id": "3",
		"_index": "myapp.logs",
		"_type": "_doc",
		"_source": {
			"@timestamp": "2020-10-21T09:00:00.050Z",
			"service.name": "myServiceA",
			"request.payload": "here some payload",
			"type": "request",
			"transaction.id": "defg"
		}
	},
	{
		"_id": "4",
		"_index": "myapp.logs",
		"_type": "_doc",
		"_source": {
			"@timestamp": "2020-10-21T09:00:00.375Z",
			"service.name": "myServiceA",
			"response.payload": "here some payload",
			"type": "response",
			"transaction.id": "defg"
		}
	}
]

As you can see, for each transaction (identified by the field transaction.id) there is a pair of documents in the source index, one reporting the request and the other reporting the response (the two can be distinguished by field type which has value "request" for the log of the request and "response" for the log of the response).

The transform, whose definition I posted above, should create a summary index, in which there is one document for each transaction.
The summary entity in the destination index should have the request timestamp (transaction.start), the response timestamp (transaction.end) and the duration of the transaction (transaction.duration), calculated by doing the difference between the response and the request timestamps.

The result from running the transform in batch mode on the sample documents shown above would be something like this:

[
	{
		"_id": "99",
		"_index": "myapp.transactions",
		"_type": "_doc",
		"_source": {
			"docs_count": 2,
			"transaction.id": "abcd",
			"transaction.start": "2020-10-21T09:00:00.000Z",
			"transaction.end": "2020-10-21T09:00:00.125Z",
			"transaction.duration": 125
		}
	},
	{
		"_id": "100",
		"_index": "myapp.transactions",
		"_type": "_doc",
		"_source": {
			"docs_count": 2,
			"transaction.id": "defg",
			"transaction.start": "2020-10-21T09:00:00.050Z",
			"transaction.end": "2020-10-21T09:00:00.375Z",
			"transaction.duration": 325
		}
	}
]

I created the transform and so far it seems it is working.
However, since I'm continuously ingesting new documents in the source index, I want to be sure that the transform is not missing any updates.

Let me explain better.
I setted the frequency parameter to 1m.
This means that only the logs the have been ingested in the source index in the interval (now-1m,now), where "now" is the time at which the transform is executed, should create or update new entities in the destination index (suppose that the @timestamp field is also the exact time at which the document is ingested).

Suppose now that, when ES runs the transform and performs a new check to discover changes to the source index, for a given transaction (again identified by field transaction.id) it founds only the type=request log because the corresponding type=response log has not been ingested yet.
At this stage, the transform will create a new "transaction" entity in the destination index which will contain the results of aggregations done over only the type=request document (so for example, docs_count will be equal to 1 and transaction.duration to 0).

If I understood correctly the way transforms work, when the next cycle is run in which the type=response log is present in the source index, a new transaction entity (with the same value in field transaction.id) will be created but it will contains only aggregations done on the type=response document.
This is because the type=request document in the source index is outside of the time-window (now-1m,now) in which ES checks for changes.
Is this correct?

From the documentation on the Create Transform API it seems like I can set the parameter sync.time.delay "such that it accounts for data transmission delays".
What does this mean exactly ?
The documentation here is not very clear.

If, for example, I set sync.time.delay to 60s (like in the example transform definition above) does that mean that every time the transform is run ES awaits for updates to the source index that can occur after 60s from the transform execution?

In the Limitations section of the documentation regarding transforms instead I found this:

A continuous transform periodically checks for changed entities between the time since it last checked and now minus sync.time.delay. This time window moves without overlapping. If the timestamp of a recently indexed document falls within this time window but this document is not yet available to search then this entity will not be updated.

If this statement above is correct, it means that in my case if the request log and response log of a given transaction happen to "fall" inside two distinct transform time-windows, they will create two distinct transaction entities in the destination index and there is no possibility to tell ES to look in the past before the 1m interval set by parameter frequency.

So my question is, is there a way to customize the interval over which ES performs the check for new updates on the source index so that is different from the "frequency" interval (i.e. the interval at which each new transform cycle is run) ?

Hi,

thanks for your interest in continuous transforms and your very detailed feedback. Let me first answer your notes/questions.

This is not correct. frequency is only a trigger to check for changes. The value of frequency is not used for querying the data. When transform decides that a new run is necessary it creates a new checkpoint than the query uses (old_checkpoint_timestamp, new_checkpoint_timestamp] to detect changes. This is important, because you might have stopped the transform or transform might need more than 1m for 1 run (for checkpoint 1). As checkpoints are persisted, transform should not miss any data, even if a failure occurs. You can see checkpoint information as part of the _stats API.

Checkpointing is explained in the docs here. In the docs you will also see that there are 2 phases: 1st detecting changes, 2nd applying changes. Let me go into more details after your next note.

This is not correct. Transform runs in 2 phases, detection and applying. In your use case using a terms group_by it works like this:

  • create a new checkpoint, we call it t_new, lets call the former checkpoint t_old
  • transform queries with range (t_old, t_new] for changes
  • assume the changes query returns that transaction a, d and y changed
  • transform queries using a terms query for a, d and y in the range (0, t_new]
  • transform indexes the updates for a, d and y, but not for transaction b, c, ...

Note that the apply phase used range (0, t_new] but not (t_old, t_new]. This is important to see all values, e.g. for calculating duration.

A downside is the requirement to keep old data, at least long enough so transform sees all required data. However, if in your case you know that 1 transaction can not be longer that a certain amount of time, you can age out data in your source index. You can also use a query to define a lower boundary, that way you can prevent transform from querying e.g. cold nodes.

Change detection has several strategies, e.g. recently we introduced an optimization for a special case with date_histogram. The drawing in the issue might be useful to understand how it works.

Transform does not know when the timestamp of your transaction got created. If you e.g. created that from an application on a device, you have to account for processing times, network transmission, ingest execution, lucene refresh. Assume your transaction got created with timestamp t, until transform can see the datapoint time passes. This is what we call delay, you should configure delay based on the worst case. There is also the option to e.g. move the timestamp used for sync closer to transform, e.g. not using a timestamp created on the device but using an ingest timestamp, created in elasticsearch using an ingest pipeline. Note that even for this case you have a minimum delay, lucene indexes are refreshed per default with a 1s interval.

If we look at the phases again: When I said transform creates a checkpoint, it creates a checkpoint not using now() as timestamp but now() - delay. You can see this in the body response of _stats in the field time_upper_bound_millis.

That is not correct as I explained. Transform works in 2 phases, only the detection phase uses a two-sided window, the apply changes phase uses a 1-sided window.

I hope that makes it more clear. If you are interested to see this in action, you can change the log level of the transform indexer to see the queries it executes (creates log spam, only do this in a dev environment):

PUT /_cluster/settings
{
   "transient": {
      "logger.org.elasticsearch.xpack.transform.transforms": "trace"
   }
}

Last but not least I like to comment on your configuration:

I suggest to delete both. The exists query is useless, the terms grouping will drop buckets without a key anyhow (the next scheduled release will support grouping null buckets, but that has to be enabled using a parameter).

The range query is hostile for transform, it's dangerous, because you can break change detection with it. I hope that becomes clear with the explanations above. Never use lte or lt on the same field used for sync. This can cause data corruption. For gte or gt there is a valid usecase: if you know that a transaction can e.g. not last for more than 1d you can use now-1d to narrow the query. Search will not return different results but you prevent some network calls in a bigger cluster that uses a hot-warm architecture.

Last but not least, transform as well as elasticsearch applies optimizations under the hood whenever possible. By trying to "help" elasticsearch with query filters, you might actually gain opposite effects. Start simple and introduce optimizations after you measured the effect.

1 Like

Hi @Hendrik_Muhs,

thank you very much for your detailed reply.
It clarified a lot of doubts.

So, if I understood correctly, if a change is detected in a transaction entity in the source index (i.e. the request and responses logs that are correlated by same value of transaction.id) the transform is always able to update the same transaction entity in the destination index because it uses the group_by fields specified in the pivot as a sort of ID of the document in the destination index, right?

Moreover the aggregations done in the apply phase are not executed only on the documents that have been created/updated in the time-window (previous_checkpoint_timestamp, current_checkpoint_timestamp] but over all the documents returned by the query in source.
This is in fact what I was hoping for.

Thanks also for the feedback on the transform definition I posted.
Indeed I thought I could "help" Elasticsearch by narrowing down the set of documents returned by the query.
In fact, I do not expect a transaction to last more than 15 min (that would be a really big problem for the application I am monitoring).

Regarding the delay, the @timestamp field which is present in the source index is in fact the timestamp of the log and not the ingest timestamp of the corresponding Elasticsearch document.
Now that I think about it, I'm ingesting this data through Logstash and sometimes the ingestion process could become very slow or halt for a brief period of time because Logstash is not working correctly.
So, to improve the performance of the transform should I use the ingest timestamp as sync.tyme field or should I just leave the @timestamp field as it is and use a very large value of sync.time.delay ?

Yes, transform uses the configuration of the group_by fields to narrow the query when transform runs in continuous mode. Transform has various implementation for this so called change detection phase. For terms we use a terms query, if you group by data histogram range queries are used etc. As we query the full source index this is important, otherwise transform would do a full re-scan of the source.

(An important aspect: if your group_by uses a script, transform can't apply the above optimization. This is only a problem if all group_by use script)

If you use a large value for sync.time.delay the destination will lag behind. This might be ok, if you only look at the data once a day. If you want to be closer to real time, the ingest timestamp is the better option. It will additionally be more robust, when pre-processing is stuck.

Ok, then I think I will use the ingest timestamp as the time field to sync the transform.

Thank you very much.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.