Continuous transform missing some documents, but they show in /_preview

I received 2 new documents earlier within a short time frame.
Neither of these msgs were updated in my transform output that is running in continuous mode.
However, if I run the same transform in the console with /_preview the expected results are shown.
I set delay to 5s yesterday in order to counter any potential ingest delay, but I don't think as long as 5s should be required if I am using the @timestamp field, correct?

I checked the live transform's stats, and there was a checkpoint c.5 seconds after these msgs. I didn't have any other msgs at that time. So it looks like the transform ran...

I need help - why would the msgs get missed in the transform output, when they do show up in /_preview?

doc 1
@timestamp Jun 18, 2020 @ 07:56:18.383
message {"recv_timestamp":"2020-06-18T06:56:17.362975577Z","exchange":"dflgateway","instrument":"BTC-USDT","client_oid":"e7203f297b1044dbbc03f50c0a089383","order_id":"5092674538389505","side":"sell","price":9450,"quantity":0.0012,"limit_or_market":"limit","order_type":"0","status":"filled","state":2,"notional":0,"filled_notional":11.34,"filled_size":0.0012,"last_fill_px":9450,"last_fill_qty":0.0012,"last_fill_time":"2020-06-18T06:56:17.305000000Z","margin_trading":2,"exchange_timestamp":"2020-06-18T06:56:17.305000000Z","created_at":"2020-06-18T01:34:06.546000000Z","extra_fields":{"identity":"3548","logon_id":"master"}}

doc 2
@timestamp Jun 18, 2020 @ 07:56:18.383
message {"recv_timestamp":"2020-06-18T06:56:17.463456605Z","exchange":"dflgateway","instrument":"BTC-USDT","client_oid":"1b5fdec92b714cfdbc17afc73e3124be","order_id":"5093941407147008","side":"buy","price":9425,"quantity":0.0012,"limit_or_market":"limit","order_type":"0","status":"open","state":0,"notional":0,"filled_notional":0,"filled_size":0,"last_fill_px":0,"last_fill_qty":0,"last_fill_time":"1970-01-01T00:00:00.000000000Z","margin_trading":2,"exchange_timestamp":"2020-06-18T06:56:17.429000000Z","created_at":"2020-06-18T06:56:17.429000000Z","extra_fields":{"identity":"3548","logon_id":"master"}}

And here is my transform script:

{
"id": "dflg_margin_limit_order_status_v7",
"source": {
"index": [
"dflg-okex-account-orders*"
],
"query": {
"bool": {
"filter": [
{
"bool": {
"should": [
{
"match": {
"margin_trading": 2
}
}
],
"minimum_should_match": 1
}
},
{
"bool": {
"filter": [
{
"bool": {
"should": [
{
"match_phrase": {
"limit_or_market": "limit"
}
}
],
"minimum_should_match": 1
}
}
]
}
}
]
}
}
},
"dest": {
"index": "dflg_margin_limit_order_status"
},
"frequency": "1s",
"sync": {
"time": {
"field": "@timestamp",
"delay": "5s"
}
},
"pivot": {
"group_by": {
"order_id": {
"terms": {
"field": "order_id.keyword"
}
},
"side": {
"terms": {
"field": "side.keyword"
}
},
"datatype": {
"terms": {
"field": "datatype.keyword"
}
},
"extra_fields.identity": {
"terms": {
"field": "extra_fields.identity.keyword"
}
},
"instrument": {
"terms": {
"field": "instrument.keyword"
}
},
"quantity": {
"histogram": {
"field": "quantity",
"interval": "0.00000001"
}
},
"price": {
"histogram": {
"field": "price",
"interval": "0.00000000001"
}
}
},
"aggregations": {
"filled_notional.max": {
"max": {
"field": "filled_notional"
}
},
"created_at": {
"max": {
"field": "created_at"
}
},
"recv_timestamp": {
"max": {
"field": "recv_timestamp"
}
},
"filled_size.max": {
"max": {
"field": "filled_size"
}
},
"last_status": {
"scripted_metric": {
"init_script": "state.filled_size_latest = 0.0; state.last_status = ''; state.recv_timestamp_latest = 0L",
"map_script": "\n def current_filled_size = doc['filled_size'].value;\n def current_recv_timestamp = doc.recv_timestamp.value.toInstant().toEpochMilli();\n if (current_filled_size > state.filled_size_latest)\n {\n state.filled_size_latest = current_filled_size;\n state.last_status = params['_source']['status'];\n state.recv_timestamp_latest = current_recv_timestamp;\n }\n else if (current_recv_timestamp > state.recv_timestamp_latest)\n {\n state.filled_size_latest = current_filled_size;\n state.last_status = params['_source']['status'];\n state.recv_timestamp_latest = current_recv_timestamp;\n }\n else if (params['_source']['status'] == 'cancelled')\n {\n state.last_status = params['_source']['status'];\n }\n ",
"combine_script": "return state",
"reduce_script": "\n def last_status = '';\n def filled_size_latest = 0L;\n def recv_timestamp_latest = 0.0;\n for (s in states) \n {\n if (s.filled_size_latest > filled_size_latest)\n {\n filled_size_latest = s.filled_size_latest; last_status = s.last_status;\n }\n else if (s.recv_timestamp_latest > recv_timestamp_latest)\n {\n recv_timestamp_latest = s.recv_timestamp_latest; last_status = s.last_status;\n }\n }\n return last_status\n "
}
}
}
},
"version": "7.5.2",
"create_time": 1592406746768
}

Perhaps I can summarise my question here - my continuous transform is live; it recorded a checkpoint 5s after new docs arrived, but failed to update the transform output at that time with the new documents.

How do I solve this?

The expected results - including the new documents - are shown to me if I call /_preview in the console.

When a checkpoint is created, lets say at time x, it creates an upper bound x-delay and than requires that any new data is after x-delay, if you insert/update old data, a continuous transform is not able to see the change. This is how continuous transform scale. The docs go a bit more into detail how it works.

Having that said, we need to find out why this did not work for you:

  • How is @timestamp created?
  • What kind of ingest processing is running upfront?

By setting a delay of 5s you assume a quite low ingest processing delay. Note that 1s is already the minimum if you use the default refresh_interval. The default delay is 60s.

  • Do you use the default or did you changed the refresh_interval of the source index?

The reason why _preview is working is simple, it creates the preview on the full data set. A new transform would also see the 2 documents you missed in the continuous transform.

Thanks @Hendrik_Muhs

@timestamp is the timestamp added by logstash as some metadata. So I think it ought to be (close to) the time the document is indexed by Elasticsearch.
Logstash upfront.

I had set to 5s in an attempt to solve my issue. I'd certainly like to reduce - and have the destination index updated as rapidly as possible.

Msgs to the source index could be quite sporadic - I may not have a msg for an hour, for instance. When a new msg/document is received, I'd like it to appear in the transform destination index ASAP.

I have not changed the refresh_interval of the source index.

Thanks for your help

Hi @Hendrik_Muhs

Any thoughts on why the transform would run but miss some docs?

I think I've seen that updates made via the /_update API need the transform to be stopped and restarted to fully take effect.

That being said, I've still seen 1 doc missed since I stopped and restarted the transform.

Using the _update API is not supported for continuous transforms, it may or may not work. In a nutshell a continuous transform works in 2 steps:

  1. find the changes
  2. recalculate the changed buckets

If you use _update its not visible for step 1. However if you change the bucket by another non _update change almost at the same time, you are lucky.

For seeing what transform does behind the scenes, you can use:

PUT /_cluster/settings
{
   "transient": {
      "logger.org.elasticsearch.xpack.transform.transforms": "trace"
   }
}

to see the queries it runs.

Sporadic messages shouldn't be a problem, however logstash has a batch delay. I still think the docs are missed due to ingest delays.

Again, moving the timestamp to ingest could help to get it closer to indexing. If you want to keep the logstash one, you can use another timestamp field (might also be helpful for debugging, e.g. by aggregating the difference between the two).

Add the following filter in logstash and it'll tag each record with the date the record was processed by logstash. You'll need to update your index template to say 'ingest_epoch' is a date, because it'll default to making it a long.

  ruby {
     code => 'event.set("ingest_epoch", (Time.now.to_f * 1000).to_i)'
  }

Then in your continuous transform, use ingest_epoch as your sync field with a delay of, say, 60 seconds. That will ensure the transform processes all records added to the index (about a minute after they arrive), regardless of the time period in @timestamp. Then use a group_by on a date histogram derived from the @timestamp to get it to overwrite the records for each time slot...

thanks @Hendrik_Muhs I added the ingest_timestamp and been good ever since. thank you!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.