How to use ElasticSearch hadoop connector to create type automatically from Streaming DataSet<Row>?

How to use ElasticSearch hadoop connector to create type automatically from Streaming DataSet ?

I have the following code.

df
        .writeStream()
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(1000))
        .format("org.elasticsearch.spark.sql")
        .option("checkpointLocation", "/tmp")
        .start("test/hello");

I want test index and hello type to be created automatically given my dataframe df. I dont have test index and hello type created prior to running my spark streaming job

so When I run the code above I get the following error

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [127.0.0.1:9200] returned Not Found(404) - type[hello] missing; Bailing out..

Changing to Dataset<T> by using an Encoder is not an option?
I do some similar stuff and can highly recommend creating the elastic index in advance, especially when having time based types in your data, like a long which stores a timestamp as Epoche. Without creating the index in advance elastic does not know that this field should be treated as an @timestamp for doing timeline analysis.

Hi! Sure. I do want to create index and type in advance however I want them to be created at runtime. so say I get stream of messages and apply some transformation which will result in a DataSet. Now, this result DataSet will have a schema right so I want to take this schema and create a index and type while mapping to ES equivalent types and start appending docs.

I can do this df.select(to_json(functions.struct(col("*"))).as("value")) and this gives me a json which I want to write it as a doc in Elastic search.

Does that make sense? How can I achieve it?

Can you please post the printSchema output of your transformation result?
( you have to change the reading from streaming to regular style to do this)

BTW i’m missing the option “es.index.auto.create” true

Hi,

The original schema is pretty long so I will paste a sample for this discussion.

col1: DoubleType
col2: MapType
col3: LongType
col4: DateType
col5: TimestampType

What do you mean by streaming to regular style? What I have is a streaming data from Kafka and I somehow have to store that data into ES.

Maybe it makes sense to reduce your current code and data to a simpler sample which could get posted here?
Have you added the es index option I posted above?

It's pretty much like above code except I added the option you suggested. so it looks like this

DataSet<Row> df = kafkaDF.map(row -> {})

df.printSchema()

df.dtypes() // gives dataframe schema need to use this to create mapping for a ES type.

df
        .writeStream()
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(1000))
        .format("org.elasticsearch.spark.sql")
        .option("es.index.auto.create", true)
        .option("checkpointLocation", "/tmp")
        .start("test/hello");

But in my case I want to create index, type and POST its mapping before writing any docs. I had expected elastic hadoop connector to do this since it has the dataframe schema information.

The only difference I can see to my code which works with DF and Dataset is that I do not set the ".outputMode("append")".
And I created the index in advance using Kibana Dev Tools.
Sorry that I can not help.

Ok is there anyway I can convert Spark Dataframe schema to elasticSearch mapping using elasticsearch connector library?

I still do not understand, why you don't simplify your DF, like doing a select on some of the fields, and try to writeStream to ES.
On my system I get the index and type create by writing to elastic. I made an additional test by deleting my already existing index. Worked as expected. You can the the index and type using Dev Tools and change types by mappings.
Could be an issues of a type in your schema or something else, but as long as you keep it the same way, I you will get stuck.
What version is your elaticsearch server? Are credential (X-pack) installed and enabled?

is there anyway I can convert Spark Dataframe schema to elasticSearch mapping using elasticsearch connector library?

The connector does not have this capability at this time. We've tossed the idea around a little bit but felt that for most cases the automatic mapping is sufficient, and in cases where it is not, more advanced properties should be set for mappings that can't be provided by a Spark schema alone (like analyzers or complex fields like nested docs or geo shapes/coordinates). In these cases we advise users to build out their mappings beforehand. In the case that they need the index created at write time, we advise using mapping templates in ES to define the mapping that gets picked up by matching the index name at write time.

Hi James! What you mean by auto mapping (since there seems to be many definitions)?

The reason why I asked conversion of Spark Dataframe schema to elasticSearch mapping is that I want to avoid dynamic mapping as recommended by ElasticSearch website itself. But with auto mapping if spark elasticsearch-hadoop connector gurantees the types of the field inside a document is going to be same or equivlent types of spark dataframe then thats all I need. I just want to avoid dynamic mapping.

For example:

Say I have a dataframe with 3 columns col1, col2, col3 where all three colums are int. It would look like this

inputDf

col1 | col2 | col3
----------------------
 1   |  5   | 6

and say I run a query like below

inputDf.sql("select col1, col2/2.0, col3")

I should get a resultdf like below

col1 | col2 | col3
----------------------
 1   |  2.5 | 6

Now say I want to write resultdf to index test and type hello to ES.

 resultdf
        .writeStream()
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(1000))
        .format("org.elasticsearch.spark.sql")
        .option("es.index.auto.create", true)
        .option("checkpointLocation", "/tmp")
        .start("test/hello");

Does fields col1, col2, col3 of all documents of type hello have the same exact dataype ? In this case it should be int, double, int respectively.

If not, I want to create a mapping before I write documents in a streaming fashion but for that It would be nice to have a utility function in elasticsearch-hadoop connector that can give me elastic-search schema given a dataframe schema .

My appologies, I do indeed mean dynamic mapping here.

I think you'll find that dynamic mapping is more than sufficient to detect those field types and apply them accordingly. That said, we have talked in the past about using a spark schema to create an index mapping if one does not exist and auto create is specified.

While this seems like a good idea at first, there are some drawbacks to consider when working with field types that Elasticsearch is familiar with but Spark is not (i.e. Geo coordinates, nested document structures, parent child documents, etc).

Using these field types would require the index mapping to be specified on the Elasticsearch side already anyways, and are the most common fields to not be sensed correctly. For most other cases we find that dynamic mapping is sufficient to detect that a field is numeric or decimal in nature, like in your example.

In the interest of keeping the connector code simplified we have opted to depend on dynamic mapping for most use cases, or to depend on user forethought to handle when dynamic mapping is inadequate, either by pre-creating the index, or using a mapping template. This may change in the future as the connector continues to evolve.

Hi James!

Could you please explain why you think dynamic mapping will be sufficient? If so, why is it recommended to turn off dynamic mapping in production. It sounds to me most people operate with ES production clusters by disabling the dynamic mapping.

can you describe what would happen in this simple case and why dynamic mapping wont be a problem?

inputdf

col1  | col2 | col3
-------------------
  1   |   2  | 5
  2   |   3  | 6

Say I do something like this (Pseudo sql)

select col2/2 from inputdf and write it to ES

the first document in ES will be {"col2": 1} and the second document will be {"col2": 1.5} so wouldn't ES throw type mismatch error here?

Dynamic mapping is not without it's faults. It can misinterpret non heterogeneous data as well as data that is typically serialized as Strings. This is often why it is disabled in production where users would like to have complete and explicit control over the mappings. I fail to see how this is all that different from creating a mapping based on a Spark SQL schema, as ES mappings and Spark SQL schemas do not form a perfect 1-to-1 relationship with each other for more advanced Elasticsearch fields (like nested values, geo, parent child), which would still see you creating the Elasticsearch mapping before hand.

Speaking strictly from an ES-Hadoop/Spark integration standpoint: Since the schema in Spark will be modified for col2 to a double format, the serialization code will serialize the 1 value as a double field complete with trailing decimal place (1.0), with the next document also being treated as a double value (1.5). The dynamic mapping should allocate doubles for the mapping of this field as it is the safest value type to allocate for decimal numerics.

In short, while we do use the Spark SQL schema to inform the connector how best to serialize a field, we do not use it to automatically create the mapping in Elasticsearch as the main pitfalls that you may experience with dynamic mapping are still likely to be experienced with that proposed feature.

Hi James!

That makes some sense now! Also what do I need to do to enable dynamic mapping?

  1. Do I need to set

sparkConf.set("es.index.auto.create", "true")

or

resultdf
        .writeStream()
        .outputMode("append")
        .format("org.elasticsearch.spark.sql")
        .option("es.index.auto.create", true)
  1. I tried both of the options but when I go to http://localhost:9200/test/_settings (test is my index name) it says

{"test":{"settings":{"index":{"number_of_shards":"5","provided_name":"test","mapper":{"dynamic":"false"}}

I feel like I am missing another option that can help me enable index.mapper.dynamic: true

I know I can make a separate PUT call to enable index.mapper.dynamic: true but I want to do this from es-hadoop-connector

Unfortunately you will need to set the dynamic mapping up on the Elasticsearch side. The connector is not really designed to set properties in Elasticsearch. If you would like to do this from Java/Scala, you could download the Low Level Java Rest Client and perform the requests through there.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.