Getting the first and last entry(by timestamp) for each filename

I have an index with around 140 milllion documents stored.

The fields in general look like this: systemname, filename, timestamp, message, version, etc,

What I want to get is the first and last document (based on the timestamp) per filename.

What I am doing right now is:

  • Query the whole index
  • Define window based on filename
  • Sort ascending
  • Add rownumber
  • Add max_rownumber
  • Filter all entries where rownumber = 1 or rownumber = max_rownumber

This is really really slow.

My second option was to sort it ascending add rownumber find number 1 and do the same sorted descending. This is as slow if not even slower than the first approach.

Code is as follows:
val ascFileWindow = Window.partitionBy($"systemname", $"filename").orderBy($"timestamp".asc)
val ds2_first = ds_filtered
.withColumn("rn", row_number().over(ascFileWindow))
.withColumn("max_rn", max($"rn").over(ascFileWindow.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)))
.withColumn("lf_flag", when($"rn" === 1 || $"rn" === $"max_rn", 1).otherwise(lit(0)))
.where($"lf_flag" === 1)

ES Version: 6.0.0
ES Spark Connector: elasticsearch-spark-20 6.0.0
Scala Version 2.11.8
Spark Version: 2.2

If someone could help me with a solution that would be really nice. I am out of Options.
Not being able to query aggregations seems to become a dealbreaker for elasticsearch but I don't want to switch away :slight_smile:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.