Writing to multiple indices using spark connector

Is is possible to write to multiple indices in one bulk operation using spark connector provided by es hadoop library?

According to the documentation, you can specify a document key for writing to multiple indices. My understanding is that the document key will also be indexed as an additional field in the document, which might not be required. Basically, is it possible to do something like the following?

val game = Map("media_type"->"game","title" -> "FF VI","year" -> "1994")
val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010")
val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien")

val indices = Seq("game","book","music")

sc.makeRDD(Seq(game, book, cd)).saveToEs(indices)

Please let me know. Thank you.

To clarify, are you asking if we can write all three of those documents to three different indices, or are you wondering if each document can be written to a different index based on its contents?

I am asking if each of these documents can be written to different index. I prefer it is not based on its contents, but I can compromise if that's not an option (since it adds an additional field in the index) because I don't see a way to do it based on how the API is defined as of today.

As it stands today, you can specify your index to use a field from your data to determine the final index name when the document is written, like so:

val game = Map("media_type"->"game","title" -> "FF VI","year" -> "1994")
val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010")
val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien")

sc.makeRDD(Seq(game, book, cd)).saveToEs("index-{media_type}")

Additionally, since many of these metadata fields are picked up from fields in the data itself, you can mark them to be ignored from the final document sent to Elasticsearch, like so:

val game = Map("media_type"->"game","title" -> "FF VI","year" -> "1994")
val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010")
val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien")

val conf = Map("es.mapping.exclude" -> "media_type")

sc.makeRDD(Seq(game, book, cd)).saveToEs("index-{media_type}", conf)

Thank you James. I think the mapping exclusion is a great option, that solves one of my problems. I have a few follow up questions:

  1. Would ingesting to multiple indices hurt performance relatively? The only reason we are doing this in the first place is because the data could be streamed to multiple indices. Each index could have tiny number of documents, so instead of grouping records by index and a round trip for each index, we want to bulk all of these together for higher throughput.
  2. Is this operation atomic? i.e. If a document from one index fails to index, would all other docs fail to index in the same bulk index request?
  3. If it is not atomic, would the API return which document/s failed to index?
  4. I observed that you have to set es.index.auto.create to true in order to use this option to ingest to multiple resources, otherwise the library returns an error irrespective of the fact that the index exists in ES cluster.

Look forward to your response.

@animageofmine

When writing to multiple indices, we rely on Elasticsearch's automatic creation of indices. This means that at write time we don't know where the primary shards are for each index, and thus cannot prioritize writing data to those nodes. Be aware that while the data is all sent in the same bulk request, that bulk request will be split by index and by shard, creating quite a lot of concurrent shard-bulk requests, which may mean that you have to use fewer concurrent tasks to write data in order to avoid flooding the bulk queues.

The operation is not atomic. Some documents may succeed in being written while others may fail.

The bulk response includes a status for each document sent, which the connector parses and determines the correct course of action: If a document has be rejected because the cluster is too busy, we back off and retry it automatically a few times before throwing the error. In version 6.2 and up, there is a new error handler API that allows you to specify a function to intercept these errors and determine what the connector should do.

As mentioned above, multi index writes rely on Elasticsearch's automatic index creation. If this setting is set to false, it means that you as a user do not want anything to create a new index during the run. This conflicts with the use of multi-index writes, and thus we throw a validation error.

When writing to multiple indices, we rely on Elasticsearch's automatic creation of indices. This means that at write time we don't know where the primary shards are for each index, and thus cannot prioritize writing data to those nodes. Be aware that while the data is all sent in the same bulk request, that bulk request will be split by index and by shard, creating quite a lot of concurrent shard-bulk requests, which may mean that you have to use fewer concurrent tasks to write data in order to avoid flooding the bulk queues.

Would the bulk request be split inside ES cluster? I am assuming that would be still be faster than multiple smaller bulk requests from spark job because of additional round trips required.

The bulk response includes a status for each document sent, which the connector parses and determines the correct course of action: If a document has be rejected because the cluster is too busy, we back off and retry it automatically a few times before throwing the error. In version 6.2 and up, there is a new error handler API that allows you to specify a function to intercept these errors and determine what the connector should do.

Did you mean the new error handler that allows us to intercept the errors is available starting 6.2? If so, is the old error handler available in 5.3 (that's the version we are on)? Is there anything I should be considering because of the version difference?

As mentioned above, multi index writes rely on Elasticsearch's automatic index creation. If this setting is set to false, it means that you as a user do not want anything to create a new index during the run. This conflicts with the use of multi-index writes, and thus we throw a validation error.

We don't want dynamic mapping. We first create the index mappings before we ingest anything and don't want to ingest any garbage that might be pumped in to our workflow (its a dependency issue). Is that possible? I tried disabling index creating in the cluster by updating elasticsearch.yml and set the es.index.auto.reate to true in spark job. I think it seems to work, except the fact that the error returned by API does not tell you which index did it fail to write to (just says index does not exist).

Yes. One bulk request is sent per task, and that bulk request is split inside the Elasticsearch cluster.

The error handler API is new in 6.2. In version 5.3 the connector still automatically retries bulk rejections due to busy nodes, but their behavior beyond that is not pluggable like in 6.2+.

For this scenario, as defined in the docs, you should be able to set action.auto_create_index in your Elasticsearch node settings to disable automatic index creation, as well as set index.mapper.dynamic to false in your Elasticsearch node settings to disable automatic field mapping.

Sounds good. I will try your suggestions. Just one clarification. When you are talking about version 6.2, do you mean es spark library library 6.2 or elasticsearch version 6.2? Can I just upgrade es spark library to 6.2 without upgrading elasticsearch version from 5.4 to 6.2?

In this case, the error handling code resides in ES-Hadoop/Spark, so the "6.2" version described corresponds to ES-Hadoop/Spark.

ES-Hadoop 6.2 should be backwards compatible with 5.4. If you run into any problems getting the versions to work together, please let me know, as I would classify that as a compatibility bug.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.