Spark writes documents in non-suitable indices

Hello,

I have a Spark code which loads data from multiple indices and writes back the enriched data into the origin index. Hence, I use the pattern '{index}' to specify the target index dynamically. However, Spark tries to write data even in indices which were not loaded initially. To simplify the comprehension of my problem, I load just one document from the index 'sales' and I see that Spark writes back data into the index 'sales' and another closed index available in ElasticSearch which is called sales-00001.

ElasticSearch version: 7.6.2 ;
Spark version: 2.2.2;
Scala version: 2.11.11.

My code:

  val spark = SparkSession
  .builder()
  .appName("Mrege-Articles")
  .master("local[*]")
  .config("spark.es.nodes","host:9243")
  .config("spark.es.port","443")
 
  .config("spark.es.nodes.wan.only","true")
  .config("spark.es.nodes.discovery","false")
  .config("spark.es.index.auto.create","true")
  . config("es.net.ssl", "true")
  .config("es.nodes.resolve.hostname", "false")
  .config("es.mapping.date.rich", "false")
  .config("es.read.field.as.array.include","tags,articleMerged,customerMerged")
  .config("es.write.operation", "update")
  .config("es.read.metadata", "true")
  .getOrCreate()

  val sc = spark.sparkContext

  val sql = new SQLContext(sc) 
  val articles = sql.esDF("articles","?q=product_ID:178799").limit(1)
  val sales = sql.esDF("sales","?q=product_ID:178799").limit(1) 

  articles.createOrReplaceTempView("articles")
  sales.createOrReplaceTempView("sales")

   val sqlStr =
   s"""
     |SELECT 
     |
     |   *, sales._metadata._id as saleId, sales._metadata._index as index
     |  
     |  FROM  articles JOIN  sales
     | ON sales.product_ID == articles.product_ID
  """.stripMargin

   val cfg = Map(
  ("es.resource", "{index}"),
  ("es.mapping.id", "saleId"),
  ("es.mapping.exclude" -> "saleId,index")
)
flaggedMergedArticles.saveToEs(cfg);

Regards,
Hamza

Hello @Hamza_BENNANI

Can you please run the following requests on Kibana Dev console or via curl and share the output?

GET sales

GET sales/_alias

GET _cat/indices/sales*

Hello @Luca_Belluccini,

Yes of course.

GET sales

{
  "sales" : {
"aliases" : { },
"mappings" : {
  "properties" : {
    "@timestamp" : {
      "type" : "date"
    },
    "@version" : {
      "type" : "text",
      "fields" : {
        "keyword" : {
          "type" : "keyword",
          "ignore_above" : 256
        }
      }
    },
    "articleMerged" : {
      "type" : "boolean"
    },
    "bmpl_flag" : {
      "type" : "keyword"
    },
    "brand" : {
      "type" : "keyword"
    },
    "bulk_flag" : {
      "type" : "keyword"
    },
    "cat1_ID" : {
      "type" : "keyword"
    },
    "cat2" : {
      "type" : "keyword"
    },
    "cat2_ID" : {
      "type" : "keyword"
    },
    "cat3" : {
      "type" : "keyword"
    },
    "cat3_ID" : {
      "type" : "keyword"
    },
    "cat4" : {
      "type" : "keyword"
    },
    "cat5" : {
      "type" : "keyword"
    },
    "cat5_ID" : {
      "type" : "keyword"
    },
    "cust_ID" : {
      "type" : "keyword"
    },
    "cust_channel" : {
      "type" : "keyword"
    },
    "cust_key_channel_label" : {
      "type" : "keyword"
    },
    "cust_key_ctg" : {
      "type" : "keyword"
    },
    "cust_key_reg_date" : {
      "type" : "keyword"
    },
    "cust_no" : {
      "type" : "keyword"
    },
    "cust_no_unique" : {
      "type" : "keyword"
    },
    "cust_seg1" : {
      "type" : "keyword"
    },
    "cust_seg1_ID" : {
      "type" : "keyword"
    },
    "cust_seg2" : {
      "type" : "keyword"
    },
    "cust_seg2_ID" : {
      "type" : "keyword"
    },
    "cust_seg3" : {
      "type" : "keyword"
    },
    "cust_seg3_ID" : {
      "type" : "keyword"
    },
    "cust_seg4" : {
      "type" : "keyword"
    },
    "cust_seg4_ID" : {
      "type" : "keyword"
    },
    "customerMerged" : {
      "type" : "boolean"
    },
    "customer_channel_short" : {
      "type" : "keyword"
    },
    "delivery_flag" : {
      "type" : "keyword"
    },
    "email_info" : {
      "type" : "keyword"
    },
    "email_permission" : {
      "type" : "keyword"
    },
    "employee_flag" : {
      "type" : "keyword"
    },
    "host" : {
      "type" : "keyword"
    },
    "index" : {
      "type" : "text",
      "fields" : {
        "keyword" : {
          "type" : "keyword",
          "ignore_above" : 256
        }
      }
    },
    "indexName" : {
      "type" : "text",
      "fields" : {
        "keyword" : {
          "type" : "keyword",
          "ignore_above" : 256
        }
      }
    },
    "invoice_id" : {
      "type" : "keyword"
    },
    "leaflet_permission" : {
      "type" : "keyword"
    },
    "listing_date" : {
      "type" : "keyword"
    },
    "markdown_flag" : {
      "type" : "keyword"
    },
    "message" : {
      "type" : "keyword"
    },
    "mobile_permission" : {
      "type" : "keyword"
    },
    "order_Date" : {
      "type" : "date"
    },
    "order_Hour" : {
      "type" : "integer"
    },
    "order_ID" : {
      "type" : "keyword"
    },
    "orig_invoice_id" : {
      "type" : "keyword"
    },
    "path" : {
      "type" : "keyword"
    },
    "phone_permission" : {
      "type" : "keyword"
    },
    "product_ID" : {
      "type" : "keyword"
    },
    "product_desc" : {
      "type" : "keyword"
    },
    "qty" : {
      "type" : "float"
    },
    "reward_program" : {
      "type" : "keyword"
    },
    "saleId" : {
      "type" : "text",
      "fields" : {
        "keyword" : {
          "type" : "keyword",
          "ignore_above" : 256
        }
      }
    },
    "sales_HT" : {
      "type" : "float"
    },
    "sales_TTC" : {
      "type" : "float"
    },
    "sales_channel" : {
      "type" : "keyword"
    },
    "sales_rep" : {
      "type" : "keyword"
    },
    "sales_type_id" : {
      "type" : "keyword"
    },
    "shrinkage_salout_flag" : {
      "type" : "keyword"
    },
    "sms_permission" : {
      "type" : "keyword"
    },
    "status_article" : {
      "type" : "keyword"
    },
    "store_ID" : {
      "type" : "keyword"
    },
    "subsys_art_name" : {
      "type" : "keyword"
    },
    "subsys_art_no" : {
      "type" : "keyword"
    },
    "supplier" : {
      "type" : "keyword"
    },
    "supplier_ID" : {
      "type" : "keyword"
    },
    "test" : {
      "type" : "text",
      "fields" : {
        "keyword" : {
          "type" : "keyword",
          "ignore_above" : 256
        }
      }
    },
    "valid_email" : {
      "type" : "keyword"
    },
    "valid_mobile" : {
      "type" : "keyword"
    },
    "valid_phone" : {
      "type" : "keyword"
    }
  }
},
"settings" : {
  "index" : { 
    "number_of_shards" : "1",
    "auto_expand_replicas" : "0-1",
    "provided_name" : "sales",
    "creation_date" : "1589898451252",
    "number_of_replicas" : "0",
    "uuid" : "5keBAnxbRJ-bBm-5B2k8GQ",
    "version" : {
      "created" : "7060299"
    }
  }
}
  }
}

GET sales/_alias

{
  "sales" : {
    "aliases" : { }
  }
}

GET _cat/indices/sales*

green open sales-00001         5keBAnxbRJ-bBm-5B2k8GQ 1 0        1    0 53.4kb 53.4kb
green open sales-00002         NDOqio1zTGCtotSWrb24Lw 1 0        0    0   283b   283b

Sorry to ask another question...
Can you also run:

GET sales*

GET sales*/_alias

Just for more context: my feeling is you have an index Lifecycle policy in place and I want to check what is the write alias you're supposed to write to.

If we call GET sales*/_ilm/explain I wouldn't be surprised to see some errors.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.