Transform - can I "group by" splitting values in one row?

I'd like to add a dimension (a single field in each document with values, "buy" or "sell") to create 2 values in a single output record/row.

So in Excel, this would look like splitting a value into 2 columns.

Is this possible in a transform? Either as a group by/dimension to create 2 values or can I filter each value I produce, and I produce a buy_metric and sell_metric for each?

Ultimately, I would like the values output in one record. Not using a simple "group by" to make 2 record results.

Thanks
Mark

Hello @Mark_Duncan

Would it be possible to get an example of the document(s) and the result you wish to get?

I've just taken this document from the discover page...

I'd like to use the field "side" as described so my output could have the following columns, where "last_fill_qty" and "last_fill_px" are existing fields/values in a document. And where I will turn multiple documents for a day into one row to summarise the day.

timestamp(day), side_last_fill_qty, buy_last_fill_qty

I could make these metrics as Scripted fields for the index pattern, but then they just appear as 0s in the transform output...

I would add additional calculated metrics in my transform output, too.

Thanks in advance!
Mark

Blockquote
{
"_index": "blah-user-orders-2020.05.05",
"_type": "_doc",
"_id": "Ncc45HEB7Yw4QDcGDsLb",
"_version": 1,
"_score": null,
"_source": {
"recv_timestamp": "2020-05-05T09:43:25.838916Z",
"last_fill_time": "0001-01-01T00:00:00.000000Z",
"notional": 0,
"side": "sell",
"quantity": 0.0017,
"instrument": "blah",
"limit_or_market": "limit",
"type": "orders",
"last_fill_qty": 0.0017,
"message": "{"recv_timestamp":"2020-05-05T09:43:25.838916Z","exchange":"blah","instrument":"blah","client_oid":"","order_id":"4845450774989824","side":"sell","price":9050.2,"quantity":0.0017,"limit_or_market":"limit","order_type":"0","status":"filled","notional":0,"filled_notional":15.38534,"filled_size":0,"last_fill_px":9050.2,"last_fill_qty":0.0017,"last_fill_time":"0001-01-01T00:00:00.000000Z","margin_trading":2,"exchange_timestamp":"2020-05-05T09:43:25.830000Z","extra_fields":null}",
"client_oid": "",
"last_fill_px": 9050.2,
"filled_notional": 15.38534,
"@timestamp": "2020-05-05T09:43:30.626Z",
"exchange_timestamp": "2020-05-05T09:43:25.830000Z",
"price": 9050.2,
"filled_size": 0,
"extra_fields": null,
"exchange": "blah",
"margin_trading": 2,
"order_type": "0",
"order_id": "4845450774989824",
"status": "filled"
},
"fields": {
"recv_timestamp": [
"2020-05-05T09:43:25.838Z"
],
"last_fill_time": [
"0001-01-01T00:00:00.000Z"
],
"+/- usdt traded test": [
15.385340536004719
],
"@timestamp": [
"2020-05-05T09:43:30.626Z"
],
"exchange_timestamp": [
"2020-05-05T09:43:25.830Z"
],
"USDT_traded": [
15.385340536004719
],
"+/- base traded": [
-0.0017000000225380063
]
},
"highlight": {
"status": [
"@kibana-highlighted-field@filled@/kibana-highlighted-field@"
]
},
"sort": [
1588671805830
]
}

Hello @Mark_Duncan, thank you for clarifying the question.

Side comment: we're improving date_nano datatype but aggregations cannot have granularity of nanosecond at the moment. This is not necessary in this case.

Demo data

PUT discusstransform
{
  "mappings": {
    "recv_timestamp": {
      "type": "date_nanos"
    },
    "last_fill_time": {
      "type": "date_nanos"
    },
    "exchange_timestamp": {
      "type": "date_nanos"
    }
  }
}

PUT discusstransform/_doc/1
{
  "recv_timestamp": "2020-05-05T09:43:25.838916Z",
  "last_fill_time": "0001-01-01T00:00:00.000000Z",
  "notional": 0,
  "side": "sell",
  "quantity": 0.0017,
  "instrument": "blah",
  "limit_or_market": "limit",
  "type": "orders",
  "last_fill_qty": 0.0017,
  "client_oid": "",
  "last_fill_px": 9050.2,
  "filled_notional": 15.38534,
  "@timestamp": "2020-05-05T09:43:30.626Z",
  "exchange_timestamp": "2020-05-05T09:43:25.830000Z",
  "price": 9050.2,
  "filled_size": 0,
  "extra_fields": null,
  "exchange": "blah",
  "margin_trading": 2,
  "order_type": "0",
  "order_id": "4845450774989824",
  "status": "filled"
}

PUT discusstransform/_doc/2
{
  "recv_timestamp": "2020-05-05T09:43:25.838916Z",
  "last_fill_time": "0001-01-01T00:00:00.000000Z",
  "notional": 0,
  "side": "buy",
  "quantity": 0.0017,
  "instrument": "blah",
  "limit_or_market": "limit",
  "type": "orders",
  "last_fill_qty": 0.0017,
  "client_oid": "",
  "last_fill_px": 9050.2,
  "filled_notional": 15.38534,
  "@timestamp": "2020-05-05T09:43:30.626Z",
  "exchange_timestamp": "2020-05-05T09:43:25.830000Z",
  "price": 9050.2,
  "filled_size": 0,
  "extra_fields": null,
  "exchange": "blah",
  "margin_trading": 2,
  "order_type": "0",
  "order_id": "4845450774989824",
  "status": "filled"
}

I cannot get exactly what is the metric or value you want to extract from last_fill_qty.

The following transform will generate 2 documents, one for the sell and one for the buy side, per day.

POST _transform/_preview
{
  "source": {
    "index": [
      "discusstransform"
    ]
  },
  "dest": {
    "index": "transform_discusstransform"
  },
  "pivot": {
    "group_by": {
      "day": {
        "date_histogram": {
          "field": "recv_timestamp",
          "calendar_interval": "1d"
        }
      },
      "side": {
        "terms": {
          "field": "side.keyword"
        }
      }
    },
    "aggregations": {
      "sum": {
        "sum": {
          "field": "last_fill_qty"
        }
      },
      "avg": {
        "avg": {
          "field": "last_fill_qty"
        }
      },
      "min": {
        "min": {
          "field": "last_fill_qty"
        }
      },
      "max": {
        "max": {
          "field": "last_fill_qty"
        }
      }
    }
  }
}

If you want to obtain one single document, the other option might be to use scripted metrics.

Example: how to obtain the latest value of last_fill_qty for buy and sell, per day, based on the recv_timestamp.

POST _transform/_preview
{
  "source": {
    "index": [
      "discusstransform"
    ]
  },
  "dest": {
    "index": "transform_discusstransform"
  },
  "pivot": {
    "group_by": {
      "day": {
        "date_histogram": {
          "field": "recv_timestamp",
          "calendar_interval": "1d"
        }
      }
    },
    "aggregations": {
      "values": {
        "scripted_metric": {
          "init_script": """
          state.last_fill_qty = ['sell':0,'buy':0];
          state.recv_timestamp_last = ['sell':0,'buy':0];
          """,
          "map_script": """
          def recv_timestamp = doc.recv_timestamp.value.toInstant().toEpochMilli();
          if(doc['side.keyword'].value == 'sell') {
            if(state.recv_timestamp_last.sell < recv_timestamp) {
              state.last_fill_qty.sell = doc.last_fill_qty.value;
              state.recv_timestamp_last.sell = recv_timestamp;
            }
          } else if (doc['side.keyword'].value == 'buy') {
            if(state.recv_timestamp_last.buy < recv_timestamp) {
              state.last_fill_qty.buy = doc.last_fill_qty.value;
              state.recv_timestamp_last.buy = recv_timestamp;
            }
          }
          """,
          "combine_script": "return state;",
          "reduce_script": """
          def last_fill_qty = ['sell':0,'buy':0];
          def recv_timestamp_last = ['sell':0,'buy':0];
          for(s in states) {
              if(s.recv_timestamp_last.sell > recv_timestamp_last.sell) {
                last_fill_qty.sell = s.last_fill_qty.sell;
                recv_timestamp_last.sell = s.recv_timestamp_last.sell;
              }
              if(s.recv_timestamp_last.buy > recv_timestamp_last.buy) {
                last_fill_qty.buy = s.last_fill_qty.buy;
                recv_timestamp_last.buy = s.recv_timestamp_last.buy;
              }
          }
          return ['sell_last_fill_qty': last_fill_qty.sell, 'buy_last_fill_qty': last_fill_qty.buy ]
          """
        }
      }
    }
  }
}

Result:

{
  "preview" : [
    {
      "values" : {
        "sell_last_fill_qty" : 0.0017000000225380063,
        "buy_last_fill_qty" : 0.0017000000225380063
      },
      "day" : 1588636800000
    }
  ],
  "mappings" : {
    "properties" : {
      "day" : {
        "type" : "date"
      }
    }
  }
}

@Luca_Belluccini that is amazing, thank you :smiley:

I had used Scripted fields for an index pattern before, but not a scripted metric in a transform.

Being really cheeky, would you have another example that you could share for a sum scripted metric? i.e. I will want to sum the day's side:buy documents last_fill_qty field as my metric.

Many thanks,
Mark

On Elasticsearch 7.7, you will have the possibility to do:

POST _transform/_preview
{
  "source": {
    "index": [
      "discusstransform"
    ]
  },
  "dest": {
    "index": "transform_discusstransform"
  },
  "pivot": {
    "group_by": {
      "day": {
        "date_histogram": {
          "field": "recv_timestamp",
          "calendar_interval": "1d"
        }
      }
    },
    "aggregations": {
      "sell": {
        "filter": {
          "term": {
            "side.keyword": "sell"
          }
        },
        "aggs": {
          "sum": {
            "sum": {
              "field": "last_fill_qty"
            }
          }
        }
      },
      "buy": {
        "filter": {
          "term": {
            "side.keyword": "buy"
          }
        },
        "aggs": {
          "sum": {
            "sum": {
              "field": "last_fill_qty"
            }
          }
        }
      }
    }
  }
}

Until 7.6:

POST _transform/_preview
{
  "source": {
    "index": [
      "discusstransform"
    ]
  },
  "dest": {
    "index": "transform_discusstransform"
  },
  "pivot": {
    "group_by": {
      "day": {
        "date_histogram": {
          "field": "recv_timestamp",
          "calendar_interval": "1d"
        }
      }
    },
    "aggregations": {
      "values": {
        "scripted_metric": {
          "init_script": """
          state.last_fill_qty = ['sell':0,'buy':0];
          """,
          "map_script": """
          if(doc['side.keyword'].value == 'sell') {
            state.last_fill_qty.sell = doc.last_fill_qty.value;
          } else if (doc['side.keyword'].value == 'buy') {
            state.last_fill_qty.buy = doc.last_fill_qty.value;
          }
          """,
          "combine_script": "return state;",
          "reduce_script": """
          def last_fill_qty = ['sell':0,'buy':0];
          for(s in states) {
            last_fill_qty.sell = s.last_fill_qty.sell;
            last_fill_qty.buy = s.last_fill_qty.buy;
          }
          return ['sell_last_fill_qty': last_fill_qty.sell, 'buy_last_fill_qty': last_fill_qty.buy ]
          """
        }
      }
    }
  }
}
1 Like

Hey @Luca_Belluccini roll on v7.7 :smiley:

Thanks again for your help here. I've tried implementing your "Until 7.6" code, but it does appear to only return a single value - the equivalent of a top hit of the final record. I get the same result as your previous reply.

However, I was hoping to get a sum of all "last_fill_qty" values from multiple documents for that day. Could you advise on the change needed, please?

Closer than you might think.

:man_facepalming: It doesn't work because I forgot to ... add the values!
Also force 0.0 or the sum will be done on integers.

POST _transform/_preview
{
  "source": {
    "index": [
      "discusstransform"
    ]
  },
  "dest": {
    "index": "transform_discusstransform"
  },
  "pivot": {
    "group_by": {
      "day": {
        "date_histogram": {
          "field": "recv_timestamp",
          "calendar_interval": "1d"
        }
      }
    },
    "aggregations": {
      "values": {
        "scripted_metric": {
          "init_script": """
          state.last_fill_qty = ['sell':0.0,'buy':0.0];
          """,
          "map_script": """
          if(doc['side.keyword'].value == 'sell') {
            state.last_fill_qty.sell += doc.last_fill_qty.value;
          } else if (doc['side.keyword'].value == 'buy') {
            state.last_fill_qty.buy += doc.last_fill_qty.value;
          }
          """,
          "combine_script": "return state;",
          "reduce_script": """
          def last_fill_qty = ['sell':0.0,'buy':0.0];
          for(s in states) {
            last_fill_qty.sell += s.last_fill_qty.sell;
            last_fill_qty.buy += s.last_fill_qty.buy;
          }
          return ['sell_last_fill_qty': last_fill_qty.sell, 'buy_last_fill_qty': last_fill_qty.buy ]
          """
        }
      }
    }
  }
}

:smiley: I see it's been released

Thanks so much for your help mate :+1:

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.