Spark, read data from ES, how to specify fields?


(Thomas Decaux) #1

I am trying to get some data from Spark to a remote elasticsearch cluster:

JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "es_articles", "?fields=title");

It doesnt seem to be implemented like "q" :confused:

Any way to select fields to return? (network performance and workaround to fix no ISO datetime.)

Thanks,


Spark code to get select firelds from ES
(Costin Leau) #2

You cannot apply projection since fields is internally used as well. For fine grained control over the mapping, consider using DataFrames which are basically RDDs plus schema.


(Ashish Belokar) #3

Here is what I did to get specific fields:

QueryBuilder query = QueryBuilders.matchAllQuery(); List<String> fields = new ArrayList<String>(); fields.add("field1"); fields.add("field2"); JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "index/type", (new SearchSourceBuilder().query(query) .fields(fields).toString()) ); System.out.println(esRDD.take(1));


(Costin Leau) #4

Using Elasticsearch to create such a basic query (to select 1-2 fields) is just wasteful. Simply add "fields" to the query as indicated here.

I'll reiterate my point though, an RDD with a schema is a Spark DataFrame. That provides not just fine control over the underlying structure but also pushed down operations - that is, the connector translating the SQL to an actual ES query.
This documentation section provides more information.

Using an RDD while trying to select the fields and such, will not only reinvent parts of Spark SQL that are already available, but also provide only a subset and ignore all the other optimizations available.


(Ashish Belokar) #5

Sounds like a great explanation for preferring Data Frames over RDDs. But, i already have the ES queries created. I guess in such cases the es.mapping.include and es.mapping.exclude properties in the configuration must be used. However, this makes the configuration object specific to a particular index. I think I will have to move to DataFrames eventually!


(Mindon) #6

in scala i do like this

sc.esRDD("somedoc/sometype", "?q=something", Map[String, String]("es.read.field.include"->"field1,field2,..."))

(Thomas Decaux) #7

Yeah but it seems there is a bug with nested field:

I do:

scala> val df = sqlContext.read.format("org.elasticsearch.spark.sql").options(Map("es.scroll.limit" -> "100000", "es.read.field.include" -> "client.hash,client.token,name")).load("events-prod2/events")
df: org.apache.spark.sql.DataFrame = [client: struct<hash:string,token:string>, name: string]

Then get:

scala> df.first
res3: org.apache.spark.sql.Row = [[null,null],search]

(Thomas Decaux) #8

Running:

sqlContext.read.format("org.elasticsearch.spark.sql").options(Map("es.scroll.limit" -> "100", "es.query" -> """{"fields" : ["client.hash"], "query" : {"match_all" : {}}}""")).load("events-prod2/events")

Gives me all the fields (because es4Hadoop sends all fields in _source query parameter, seen via an HTTP proxy)

With version 2.4.0 I got this error:

Field 'client.hash' is backed by an array but the associated Spark Schema does not reflect this


(Thomas Decaux) #9

Thanks you, this is the only way I managed to use!

@costin I believe this is very important to select fields, because this save band-width, isnit ?

And in my case, I use it to perform terms aggregations and get ALL results (+ some join).

So I download from ES via RDD (bcoz I can select fields), transform it to DataFrame, and run SQL query at the end!


(system) #10