I received 2 new documents earlier within a short time frame.
Neither of these msgs were updated in my transform output that is running in continuous mode.
However, if I run the same transform in the console with /_preview the expected results are shown.
I set delay to 5s yesterday in order to counter any potential ingest delay, but I don't think as long as 5s should be required if I am using the @timestamp field, correct?
I checked the live transform's stats, and there was a checkpoint c.5 seconds after these msgs. I didn't have any other msgs at that time. So it looks like the transform ran...
I need help - why would the msgs get missed in the transform output, when they do show up in /_preview?
doc 1
@timestamp Jun 18, 2020 @ 07:56:18.383
message {"recv_timestamp":"2020-06-18T06:56:17.362975577Z","exchange":"dflgateway","instrument":"BTC-USDT","client_oid":"e7203f297b1044dbbc03f50c0a089383","order_id":"5092674538389505","side":"sell","price":9450,"quantity":0.0012,"limit_or_market":"limit","order_type":"0","status":"filled","state":2,"notional":0,"filled_notional":11.34,"filled_size":0.0012,"last_fill_px":9450,"last_fill_qty":0.0012,"last_fill_time":"2020-06-18T06:56:17.305000000Z","margin_trading":2,"exchange_timestamp":"2020-06-18T06:56:17.305000000Z","created_at":"2020-06-18T01:34:06.546000000Z","extra_fields":{"identity":"3548","logon_id":"master"}}
doc 2
@timestamp Jun 18, 2020 @ 07:56:18.383
message {"recv_timestamp":"2020-06-18T06:56:17.463456605Z","exchange":"dflgateway","instrument":"BTC-USDT","client_oid":"1b5fdec92b714cfdbc17afc73e3124be","order_id":"5093941407147008","side":"buy","price":9425,"quantity":0.0012,"limit_or_market":"limit","order_type":"0","status":"open","state":0,"notional":0,"filled_notional":0,"filled_size":0,"last_fill_px":0,"last_fill_qty":0,"last_fill_time":"1970-01-01T00:00:00.000000000Z","margin_trading":2,"exchange_timestamp":"2020-06-18T06:56:17.429000000Z","created_at":"2020-06-18T06:56:17.429000000Z","extra_fields":{"identity":"3548","logon_id":"master"}}
And here is my transform script:
{
"id": "dflg_margin_limit_order_status_v7",
"source": {
"index": [
"dflg-okex-account-orders*"
],
"query": {
"bool": {
"filter": [
{
"bool": {
"should": [
{
"match": {
"margin_trading": 2
}
}
],
"minimum_should_match": 1
}
},
{
"bool": {
"filter": [
{
"bool": {
"should": [
{
"match_phrase": {
"limit_or_market": "limit"
}
}
],
"minimum_should_match": 1
}
}
]
}
}
]
}
}
},
"dest": {
"index": "dflg_margin_limit_order_status"
},
"frequency": "1s",
"sync": {
"time": {
"field": "@timestamp",
"delay": "5s"
}
},
"pivot": {
"group_by": {
"order_id": {
"terms": {
"field": "order_id.keyword"
}
},
"side": {
"terms": {
"field": "side.keyword"
}
},
"datatype": {
"terms": {
"field": "datatype.keyword"
}
},
"extra_fields.identity": {
"terms": {
"field": "extra_fields.identity.keyword"
}
},
"instrument": {
"terms": {
"field": "instrument.keyword"
}
},
"quantity": {
"histogram": {
"field": "quantity",
"interval": "0.00000001"
}
},
"price": {
"histogram": {
"field": "price",
"interval": "0.00000000001"
}
}
},
"aggregations": {
"filled_notional.max": {
"max": {
"field": "filled_notional"
}
},
"created_at": {
"max": {
"field": "created_at"
}
},
"recv_timestamp": {
"max": {
"field": "recv_timestamp"
}
},
"filled_size.max": {
"max": {
"field": "filled_size"
}
},
"last_status": {
"scripted_metric": {
"init_script": "state.filled_size_latest = 0.0; state.last_status = ''; state.recv_timestamp_latest = 0L",
"map_script": "\n def current_filled_size = doc['filled_size'].value;\n def current_recv_timestamp = doc.recv_timestamp.value.toInstant().toEpochMilli();\n if (current_filled_size > state.filled_size_latest)\n {\n state.filled_size_latest = current_filled_size;\n state.last_status = params['_source']['status'];\n state.recv_timestamp_latest = current_recv_timestamp;\n }\n else if (current_recv_timestamp > state.recv_timestamp_latest)\n {\n state.filled_size_latest = current_filled_size;\n state.last_status = params['_source']['status'];\n state.recv_timestamp_latest = current_recv_timestamp;\n }\n else if (params['_source']['status'] == 'cancelled')\n {\n state.last_status = params['_source']['status'];\n }\n ",
"combine_script": "return state",
"reduce_script": "\n def last_status = '';\n def filled_size_latest = 0L;\n def recv_timestamp_latest = 0.0;\n for (s in states) \n {\n if (s.filled_size_latest > filled_size_latest)\n {\n filled_size_latest = s.filled_size_latest; last_status = s.last_status;\n }\n else if (s.recv_timestamp_latest > recv_timestamp_latest)\n {\n recv_timestamp_latest = s.recv_timestamp_latest; last_status = s.last_status;\n }\n }\n return last_status\n "
}
}
}
},
"version": "7.5.2",
"create_time": 1592406746768
}