Hi,
How to map the fields in the dataframe using scala code to load data into ES. For example - string to int or string to date field or string to array. please help me to achieve the same.
Hi,
How to map the fields in the dataframe using scala code to load data into ES. For example - string to int or string to date field or string to array. please help me to achieve the same.
If you're asking on how to make a string field map as an integer or any datatype to map as a different one, the easiest way would be to use a transformation function in Spark to modify the data type before writing it. Additionally, you could use something like Elasticsearch's multi-field feature to map similar datatypes into different fields.
James,
Thanks for the reply!
Its something to know about mapping any datatype to another. In addition, my requirement is to convert 1. string to int 2, String to date or timestamp.
I am using the below code to ingest data into elasticsearch from spark dataframe.
package com.accenture.ds.elasticsearchtest
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.sql._
import java.util.Formatter.DateTime
import java.sql.Date
import org.apache.spark.sql.types.DateType
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.Encoders
import java.sql.Timestamp
object ElasticSearchTest extends App {
case class UBSchema(accenture_device_id : String,lat : Double,lon : Double,polygon_id : Long,unixtime : Long,batch_id : String,data_collection_date : Date,input_file_name : String ,event_datetime : Timestamp)
val sparkConf = SparkSession.builder.appName("SparkSessionESExample").master("local").getOrCreate()
val sqlContext = sparkConf.sqlContext
// sparkConf.conf.set("es.nodes.wan.only", "true")
sparkConf.conf.set("es.nodes", "localhost")
sparkConf.conf.set("es.port", "9300")
sparkConf.conf.set("spark.es.nodes", "localhost")
sparkConf.conf.set("spark.es.port", "9300")
// sqlContext.setConf("es.nodes.wan.only", "true")
/ sqlContext.setConf("es.nodes", "localhost")
// sqlContext.setConf("spark.es.nodes", "localhost")
// sqlContext.setConf("spark.es.port", "9300")
val RawFileSchema = Encoders.product[UBSchema].schema
val tempdf = sqlContext.read.format("csv").option("header", true).schema(RawFileSchema).load("C:/Personal/Sample vendor files/samplefile_actual_10.csv")
tempdf.show()
tempdf.printSchema()
tempdf.saveToEs("ubtest/v10es",Map("strict_date_optional_time" -> "epoch_millis||yyyy/MM/dd HH:mm:ss Z||yyyy/MM/dd Z"))
println("data loaded in ES..")
Below is the schema created in spark level with proper datatype
|-- accenture_device_id: string (nullable = true)
|-- lat: double (nullable = true)
|-- lon: double (nullable = true)
|-- polygon_id: long (nullable = true)
|-- unixtime: long (nullable = true)
|-- batch_id: string (nullable = true)
|-- data_collection_date: date (nullable = true)
|-- input_file_name: string (nullable = true)
|-- event_datetime: timestamp (nullable = true)
So when I load the data into elasticsearch and below are the mappings
"mapping": {
"v10es": {
"properties": {
"accenture_device_id": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"batch_id": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"data_collection_date": {
"type": "long"
},
"event_datetime": {
"type": "long"
},
"input_file_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"lat": {
"type": "float"
},
"lon": {
"type": "float"
},
"polygon_id": {
"type": "long"
},
"unixtime": {
"type": "long"
}
}
}
}
In the mapping section, the date column has been saved as long. So the requirement is to load the data same as date datatype. please help me to achieve the same.
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.
© 2020. All Rights Reserved - Elasticsearch
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant logo are trademarks of the Apache Software Foundation in the United States and/or other countries.