I want test index and hello type to be created automatically given my dataframe df. I dont have test index and hello type created prior to running my spark streaming job
so When I run the code above I get the following error
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [127.0.0.1:9200] returned Not Found(404) - type[hello] missing; Bailing out..
Changing to Dataset<T> by using an Encoder is not an option?
I do some similar stuff and can highly recommend creating the elastic index in advance, especially when having time based types in your data, like a long which stores a timestamp as Epoche. Without creating the index in advance elastic does not know that this field should be treated as an @timestamp for doing timeline analysis.
Hi! Sure. I do want to create index and type in advance however I want them to be created at runtime. so say I get stream of messages and apply some transformation which will result in a DataSet. Now, this result DataSet will have a schema right so I want to take this schema and create a index and type while mapping to ES equivalent types and start appending docs.
I can do this df.select(to_json(functions.struct(col("*"))).as("value")) and this gives me a json which I want to write it as a doc in Elastic search.
Maybe it makes sense to reduce your current code and data to a simpler sample which could get posted here?
Have you added the es index option I posted above?
It's pretty much like above code except I added the option you suggested. so it looks like this
DataSet<Row> df = kafkaDF.map(row -> {})
df.printSchema()
df.dtypes() // gives dataframe schema need to use this to create mapping for a ES type.
df
.writeStream()
.outputMode("append")
.trigger(Trigger.ProcessingTime(1000))
.format("org.elasticsearch.spark.sql")
.option("es.index.auto.create", true)
.option("checkpointLocation", "/tmp")
.start("test/hello");
But in my case I want to create index, type and POST its mapping before writing any docs. I had expected elastic hadoop connector to do this since it has the dataframe schema information.
The only difference I can see to my code which works with DF and Dataset is that I do not set the ".outputMode("append")".
And I created the index in advance using Kibana Dev Tools.
Sorry that I can not help.
I still do not understand, why you don't simplify your DF, like doing a select on some of the fields, and try to writeStream to ES.
On my system I get the index and type create by writing to elastic. I made an additional test by deleting my already existing index. Worked as expected. You can the the index and type using Dev Tools and change types by mappings.
Could be an issues of a type in your schema or something else, but as long as you keep it the same way, I you will get stuck.
What version is your elaticsearch server? Are credential (X-pack) installed and enabled?
is there anyway I can convert Spark Dataframe schema to elasticSearch mapping using elasticsearch connector library?
The connector does not have this capability at this time. We've tossed the idea around a little bit but felt that for most cases the automatic mapping is sufficient, and in cases where it is not, more advanced properties should be set for mappings that can't be provided by a Spark schema alone (like analyzers or complex fields like nested docs or geo shapes/coordinates). In these cases we advise users to build out their mappings beforehand. In the case that they need the index created at write time, we advise using mapping templates in ES to define the mapping that gets picked up by matching the index name at write time.
Hi James! What you mean by auto mapping (since there seems to be many definitions)?
The reason why I asked conversion of Spark Dataframe schema to elasticSearch mapping is that I want to avoid dynamic mapping as recommended by ElasticSearch website itself. But with auto mapping if spark elasticsearch-hadoop connector gurantees the types of the field inside a document is going to be same or equivlent types of spark dataframe then thats all I need. I just want to avoid dynamic mapping.
For example:
Say I have a dataframe with 3 columns col1, col2, col3 where all three colums are int. It would look like this
Does fields col1, col2, col3 of all documents of type hello have the same exact dataype ? In this case it should be int, double, int respectively.
If not, I want to create a mapping before I write documents in a streaming fashion but for that It would be nice to have a utility function in elasticsearch-hadoop connector that can give me elastic-search schema given a dataframe schema .
I think you'll find that dynamic mapping is more than sufficient to detect those field types and apply them accordingly. That said, we have talked in the past about using a spark schema to create an index mapping if one does not exist and auto create is specified.
While this seems like a good idea at first, there are some drawbacks to consider when working with field types that Elasticsearch is familiar with but Spark is not (i.e. Geo coordinates, nested document structures, parent child documents, etc).
Using these field types would require the index mapping to be specified on the Elasticsearch side already anyways, and are the most common fields to not be sensed correctly. For most other cases we find that dynamic mapping is sufficient to detect that a field is numeric or decimal in nature, like in your example.
In the interest of keeping the connector code simplified we have opted to depend on dynamic mapping for most use cases, or to depend on user forethought to handle when dynamic mapping is inadequate, either by pre-creating the index, or using a mapping template. This may change in the future as the connector continues to evolve.
Could you please explain why you think dynamic mapping will be sufficient? If so, why is it recommended to turn off dynamic mapping in production. It sounds to me most people operate with ES production clusters by disabling the dynamic mapping.
can you describe what would happen in this simple case and why dynamic mapping wont be a problem?
Dynamic mapping is not without it's faults. It can misinterpret non heterogeneous data as well as data that is typically serialized as Strings. This is often why it is disabled in production where users would like to have complete and explicit control over the mappings. I fail to see how this is all that different from creating a mapping based on a Spark SQL schema, as ES mappings and Spark SQL schemas do not form a perfect 1-to-1 relationship with each other for more advanced Elasticsearch fields (like nested values, geo, parent child), which would still see you creating the Elasticsearch mapping before hand.
Speaking strictly from an ES-Hadoop/Spark integration standpoint: Since the schema in Spark will be modified for col2 to a double format, the serialization code will serialize the 1 value as a double field complete with trailing decimal place (1.0), with the next document also being treated as a double value (1.5). The dynamic mapping should allocate doubles for the mapping of this field as it is the safest value type to allocate for decimal numerics.
In short, while we do use the Spark SQL schema to inform the connector how best to serialize a field, we do not use it to automatically create the mapping in Elasticsearch as the main pitfalls that you may experience with dynamic mapping are still likely to be experienced with that proposed feature.
Unfortunately you will need to set the dynamic mapping up on the Elasticsearch side. The connector is not really designed to set properties in Elasticsearch. If you would like to do this from Java/Scala, you could download the Low Level Java Rest Client and perform the requests through there.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.