Staging Data for Elasticsearch bulk loading


(Jonathan Spooner) #1

I'm looking for the best storage format to stage data for elasticsearch ingestion. By best I mean the storage format that will require little or no spark processing time. Ideally this Spark job would load data and push directly to elasticsearch.

Why do I want a Spark job that just loads data into elasticsearch? I have a 3TB dataset and I'd like to load a small percentage of it multiple times so we can test different es configurations and performance. After we have a configuration we're pleased with we'll likely have a daily job that summarizes and pushes to elasticsearch.

I currently have an aggregation job that converts raw events into summaries and writes to S3 in Parquet format. I then have a second job to reads that data and transform it into elastic bulk format before using es-hadoop.

Example Job that pushes to elasticsearch:

val dataframe = sqlContext.read.parquet(path)
val myDF = dataframe.select("device_id").distinct
val parentRDD = myDF.map( x=>
  (
    Map(ID -> x(0).toString.trim),
    Map()
  )
)
val childrenRDD = dataframe.map( x=>
  (
    Map(
      ID -> x(0),
      PARENT -> x(1)
    ),
    Map(
      "foo" -> x(0),
      "foo" -> x(2)
    )
  )
)
EsSpark.saveToEsWithMeta(parentRDD, "development/parent")
EsSpark.saveToEsWithMeta(childrenRDD, "development/child")

My problem with this example is the map action can take a considerable amount of time. Ideally I'd like to stage parentRDD and childrenRDD to file but I'm not sure what the best format would be?

I like using es-hadoop for pushing to elasticsearch because of it's configuration for bulk file size and retry policy but would be open to using something else.


(James Baiera) #2

My advice in this scenario would be to store the data already formatted as JSON and to use the es.input.json property to load the data in. Make sure that you only have one document per line in the file. There's a section in our docs that covers how to do this.

Loading with es.input.json enabled minimizes the amount of the serialization work ES-Hadoop has to do. For your use case, it allows you to remove some processing time from Spark during indexing.

As for file format, you're really just looking for a format that has good IO performance. Parquet should do just fine.


(system) #3