Save PySpark DataFRame to ElasticSEarch index

Using Spark 2.6 and elasticsearch 6.8

I am trying to save a PySpark DataFrame to elasticsearch index using the external library:

df_to_save.write.format('org.elasticsearch.spark.sql').option('es.resource', '%s/%s' % ('spark_obj_2020', '_doc'),).save()

The index I am saving to has a template definition with complex/object types. The DataFrame saves to the index, but all data is saved as Strings. The JSON Structure is lost, and a column converted to JSON type Struct gets converted to string data.

Index template looks like:

  "mappings": {
    "_doc": {
        "properties": {
            "properties": {
              "address": {
                "fields": {
                  "text": {
              "ip": {"type":"ip"},
              "jam": {
                 "properties": {
                   "ink": {
                     "type": "keyword",
                     "fields": {
                       "text": {
                   "stream": {
                     "type": "keyword",
                     "fields": {
                       "text": {

Spark schema:
|-- host: struct (nullable = false)
| |-- host: string (nullable = true)
|-- method: string (nullable = true)
|-- date: string (nullable = true)

The host data contains a JSON String with matching structure.

Found the fix:

  1. Fix DataFrame column create and Type, leave as Struct do not convert to JSON:
    #df_to_save = df_to_save.withColumn("host", F.to_json(F.struct("address","ip","jam")) )
    df_to_save = df_to_save.withColumn("host", F.struct("address","ip","jam"))
