Use cases Elasticsearch and Spark

(Alex Budniy) #1


In our company we want to add analytics to the company's product to show different kind of reports to customers.
Currently we store such kind of data in elasticsearch in separate indexes:

  1. small info about connected devices, like mac, device os etc.
  2. attributes of logged in users - gender, age etc.
    Probably some more data will go to other separate indexes.

Since data is in separate indexes we can't really use ES analytics capabilities out of the box. Options are:

  1. Run batch jobs (or in real time) create more indexes consolidating data from separate indexes and query these new indexes. The problem is that all data can't go to one index, because it is not always parent-child etc. Can be quite unrelated data but still useful for reports. And it will require some number of indexes with combined data and who knows how many and which kind because of report flexibility
  2. Use Spark to read data from indexes and make reports using map-reduce.

I have a feeling that it is a mistake not to use all features of ES but I don't really see any other ways to support flexible reports when using separate indexes! Any comments or advice will be greatly appreciated!

As for using Spark - I have an impression that it is a bit slow for fast analytics in real time. And the reason is that it takes a lot of time to read raw data from ES into Spark.
When ES spends a fraction of second to aggregate data - Spark spends 5-10 secs only to read raw data! Obviously it is very fast after that doing map-reduce. But there is no way to pre-process data in ES before reading it into Spark. As far as I noticed when using JavaEsSpark.esRDD - it is possible to only do basic filtering using Query String syntax. Let's say specify start-end date and some fields for filtering. But in my case raw session data over one day will contain ~160K docs. Also I can't specify that I need maybe only few fields instead of the whole document.
SparkSQL can push filters to ES, exactly what I would like to use - but from my experience SparkSQL is not mature yet, and translates to quite strange ES queries. For example, simple expression
"where hotspot_id in (234, 645, 534)" will generate invalid query at all. I wish I knew Scala, would be happy to fix such kind of bugs!

Sorry for lots of information. I will really appreciate any recommendations about how to deal with all these and how to do analytics in a better way.

Kind regards,

ES Aggregations in Spark
(Costin Leau) #2


Have you tried running aggregations across multiple indicies ? If that's no suitable, you can try creating synthetic indices which basically contain just the data from cross indices that is relevant for analysis.

You are right that moving data, especially lots of it across systems is expensive. Unless the calculation / operation is expensive, loading and saving the data can become overhead as they take significantly longer than the process itself.

Regarding the RDD, you can specify a simple query string but also a full blown query dsl as indicated here.
Indeed there's no public API to limit the number of fields loaded (though this is possible internally) hence why I raised an issue for it - feel free to comment/track it.

As for the SparkSQL to QueryDSL translation, please create an issue with the SQL queries that are not properly translated and we'll sorted out. Of course, a patch would be most welcome but a bug report in the first place (and a good one like this post) is half the job done.

Not to worry about too much information. Posts like this are a pleasure to read since they offer insight into the user needs beyond the to the point "how can I do X".


(Alex Budniy) #3

Hello Costin,

Thanks for your reply. I will have a look at multiple indices. Also, currently we are thinking of using separate indices for particular reports. Actually, using ES and Spark it is very easy to read data from a couple of indices, transform-combine the data and write it to a new index.

As for SparkSQL query - I raised a bug, please have a look:

Thanks for you work Costin, I think ES - Spark is a cool and useful connector.

Kind regards,

(Alex Budniy) #4

Oh, I found the fix for this issue after looking at sources. I had to enable strict pushdown like this:

conf.set("es.internal.spark.sql.pushdown.strict", "true");

After that "IN" queries are generated properly.

(Costin Leau) #5

Copying here the reply from the issue just to make sure folks reading this thread will get the proper answer:

es.internal. is reserved for internal configuration and it's not supported. If I understand correctly when you enable strict pushdown the filter is properly created. However the default is not
strict (meaning it's working on analyzed vs non-analyzed filters).

Long story short - you can change the strict aspect through a proper setting that will be portable across settings. Second, when not using a strict pushdown, the query appears to be invalid which is a bug.

(system) #6