I have an index with around 140 milllion documents stored.
The fields in general look like this: systemname, filename, timestamp, message, version, etc,
What I want to get is the first and last document (based on the timestamp) per filename.
What I am doing right now is:
- Query the whole index
- Define window based on filename
- Sort ascending
- Add rownumber
- Add max_rownumber
- Filter all entries where rownumber = 1 or rownumber = max_rownumber
This is really really slow.
My second option was to sort it ascending add rownumber find number 1 and do the same sorted descending. This is as slow if not even slower than the first approach.
Code is as follows:
val ascFileWindow = Window.partitionBy($"systemname", $"filename").orderBy($"timestamp".asc)
val ds2_first = ds_filtered
.withColumn("max_rn", max($"rn").over(ascFileWindow.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)))
.withColumn("lf_flag", when($"rn" === 1 || $"rn" === $"max_rn", 1).otherwise(lit(0)))
.where($"lf_flag" === 1)
ES Version: 6.0.0
ES Spark Connector: elasticsearch-spark-20 6.0.0
Scala Version 2.11.8
Spark Version: 2.2
If someone could help me with a solution that would be really nice. I am out of Options.
Not being able to query aggregations seems to become a dealbreaker for elasticsearch but I don't want to switch away