Writing Dataframe to Elasticsearch using scala


(Ramanand) #1

Hi,

How to map the fields in the dataframe using scala code to load data into ES. For example - string to int or string to date field or string to array. please help me to achieve the same.


(James Baiera) #2

If you're asking on how to make a string field map as an integer or any datatype to map as a different one, the easiest way would be to use a transformation function in Spark to modify the data type before writing it. Additionally, you could use something like Elasticsearch's multi-field feature to map similar datatypes into different fields.


(Ramanand) #3

James,
Thanks for the reply!
Its something to know about mapping any datatype to another. In addition, my requirement is to convert 1. string to int 2, String to date or timestamp.


(Ramanand) #4

I am using the below code to ingest data into elasticsearch from spark dataframe.

package com.accenture.ds.elasticsearchtest

import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.sql._
import java.util.Formatter.DateTime
import java.sql.Date
import org.apache.spark.sql.types.DateType
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.Encoders
import java.sql.Timestamp

object ElasticSearchTest extends App {
  
  case class UBSchema(accenture_device_id : String,lat : Double,lon : Double,polygon_id : Long,unixtime : Long,batch_id : String,data_collection_date : Date,input_file_name : String ,event_datetime : Timestamp)

  val sparkConf = SparkSession.builder.appName("SparkSessionESExample").master("local").getOrCreate()
  
  val sqlContext = sparkConf.sqlContext
// sparkConf.conf.set("es.nodes.wan.only", "true")
   sparkConf.conf.set("es.nodes", "localhost")
   sparkConf.conf.set("es.port", "9300")
   sparkConf.conf.set("spark.es.nodes", "localhost")
   sparkConf.conf.set("spark.es.port", "9300")
// sqlContext.setConf("es.nodes.wan.only", "true")
/  sqlContext.setConf("es.nodes", "localhost")
// sqlContext.setConf("spark.es.nodes", "localhost")
// sqlContext.setConf("spark.es.port", "9300")
  val RawFileSchema = Encoders.product[UBSchema].schema
  
  val tempdf = sqlContext.read.format("csv").option("header", true).schema(RawFileSchema).load("C:/Personal/Sample vendor files/samplefile_actual_10.csv")
  tempdf.show()
  tempdf.printSchema()
  
  tempdf.saveToEs("ubtest/v10es",Map("strict_date_optional_time" -> "epoch_millis||yyyy/MM/dd HH:mm:ss Z||yyyy/MM/dd Z"))
  println("data loaded in ES..")

Below is the schema created in spark level with proper datatype

 |-- accenture_device_id: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- polygon_id: long (nullable = true)
 |-- unixtime: long (nullable = true)
 |-- batch_id: string (nullable = true)
 |-- data_collection_date: date (nullable = true)
 |-- input_file_name: string (nullable = true)
 |-- event_datetime: timestamp (nullable = true) 

So when I load the data into elasticsearch and below are the mappings

 "mapping": {
    "v10es": {
      "properties": {
        "accenture_device_id": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "batch_id": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "data_collection_date": {
          "type": "long"
        },
        "event_datetime": {
          "type": "long"
        },
        "input_file_name": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "lat": {
          "type": "float"
        },
        "lon": {
          "type": "float"
        },
        "polygon_id": {
          "type": "long"
        },
        "unixtime": {
          "type": "long"
        }
      }
    }
  } 

In the mapping section, the date column has been saved as long. So the requirement is to load the data same as date datatype. please help me to achieve the same.