Elastic spark connector not able map longitude ,latitude values to geo_type


(Pavan Penhandra) #1

Hi,

When im writing longitude latitude values in string format with spark sql the data type of "location" field mapping to string not to geo_type

I followed the format
https://www.elastic.co/guide/en/elasticsearch/reference/current/geo-point.html

Query :sqlContext.sql( "SELECT vehicle_id,device_id,creation_time,timestamp_hour,engine_rpm ,concat( lon,lit(","),lat) as location FROM mytable" ) .saveToEs("iot_location/docs")

so that my string value from spark sql will be mapped to geo_type in elastic search

but its only mapping to string.

When i tried with array of longitude ,latitude elastic search mapping it to array
when i tried with structure (longitude ,latitude ) elastic search mapping to to structure
I need "location" to be geo_type so that i can use this field in kibana and grafana

location "41.12,-71.34" as string
location -71.34, 41.12 as an array

My index mappings

    "index": {
        "mappings": {
            "sensorinfo": {
                "properties": {
                    "creation_time": {
                        "format": "strict_date_optional_time||epoch_millis",
                        "type": "date"
                    },
                    "location": {
                        "type": "geo_point",
                        "lat_lon": "true"
                    },
                    "engine_rpm": {
                        "type": "float"
                    },
                    "timestamp_hour": {
                        "format": "strict_date_optional_time||epoch_millis",
                        "type": "date"
                    },
                    "device_id": {
                        "type": "string",
                        "analyzer": "analyzer-name"
                    },
                    "vehicle_id": {
                        "type": "string",
                        "analyzer": "analyzer-name"
                    }
                }
            }
        },
       
	
			**Changed mappings after writing data to elastic**
			
			
},
"mappings": {
    "docs": {
        "properties": {
            "creation_time": {
                "format": "strict_date_optional_time||epoch_millis",
                "type": "date"
            },
            "timestamp_hour": {
                "format": "strict_date_optional_time||epoch_millis",
                "type": "date"
            },
            "device_id": {
                "type": "string"
            },
            "location": {
                "type": "string"
            },
            "engine_rpm": {
                "type": "double"
            },
            "vehicle_id": {
                "type": "string"
            }
        }
    }
},
"aliases": [ ]

}


(James Baiera) #2

Based on your post, I'm assuming that you have created a type in the index called sensorinfo and specified the mapping but it looks like you're writing to the docs type in the code example. If the docs type does not exist before writing, then Elasticsearch may be trying to infer the type of the location field for you (and is getting it wrong). Does the issue still happen if you write the data to the already created and mapped iot_location/sensorinfo?


(Pavan Penhandra) #3

the issue still happening even if i write the data to the already created and mapped iot_elastic/sensorinfo

{"iot_elastic":
{"mappings":
{"sensorinfo":{"properties":{"creation_time":{"type":"date","format":"strict_date_optional_time||epoch_millis"},"device_id":{"type":"string"},"engine_rpm":{"type":"double"},"location":{"type":"string"},"timestamp_hour":{"type":"date","format":"strict_date_optional_time||epoch_millis"},"vehicle_id":{"type":"string"}}}}}}


(James Baiera) #4

@Pavan_Penhandra, I tried to recreate this behavior on the master branch but I am unable to do so. I've tried the following code:

val rc = new ExtendedRestClient()

// touch the index and put the mappings
rc.put("iot_location")
rc.put("iot_location/sensorinfo/_mapping", """{"sensorinfo":{"properties":{"name":{"type": "string"},"location":{"type": "geo_point"}}}}""".getBytes(StringUtils.UTF_8))

sqlContext.createDataFrame(
    Seq(Row.fromSeq(Seq("Generic Location", "41.12,-71.34"))).asJava,
    StructType(Array(StructField("name", StringType), StructField("location", StringType)))
).saveToEs("iot_location/sensorinfo")

rc.post("iot_location/_refresh")

val df = sqlContext.read.format("es").load("iot_location")

// In SparkSQL, if location is read it is treated as a string
val dataType = df.schema("location").dataType
assertEquals("string", dataType.typeName)

// Getting the mapping should show that location is a GEO_POINT
val mappingData = RestUtils.getMapping("iot_location/sensorinfo")
assertEquals(FieldType.GEO_POINT, mappingData.properties().find(_.name() == "location").map(_.`type`()).get)

// Data exists in the index
val head = df.head()
assertThat(head.getString(0), containsString("41.12,-71.34"))
assertThat(head.getString(1), containsString("Generic Location"))

// We'll get the raw mapping response, just to double check
println("RAW MAPPING:")
println(rc.get("iot_location/_mapping"))
rc.close()

The output of the program is:

RAW MAPPING:
{"iot_location":{"mappings":{"sensorinfo":{"properties":{"location":{"type":"geo_point"},"name":{"type":"text"}}}}}}

Could you share which version of Elasticsearch and ES-Hadoop you are using?


(Pavan Penhandra) #5

"org.apache.spark" %% "spark-core" % "1.6.1" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.6.1" % "provided",
"org.apache.spark" %% "spark-streaming-kafka" % "1.6.1",
"org.elasticsearch" % "elasticsearch-spark_2.10" % "2.3.4" % "provided",
"org.apache.spark" % "spark-sql_2.10" % "1.6.1",


(Pavan Penhandra) #6

{

"state": "open",
"settings": {
    "index": {
        "mappings": {
            "sensorinfo": {
                "properties": {
                    "creation_time": {
                        "format": "strict_date_optional_time||epoch_millis",
                        "type": "date"
                    },
                    "location": {
                        "type": "geo_point"
                    },
                    "engine_rpm": {
                        "type": "float"
                    },
                    "timestamp_hour": {
                        "format": "strict_date_optional_time||epoch_millis",
                        "type": "date"
                    },
                    "device_id": {
                        "type": "string",
                        "analyzer": "analyzer-name"
                    },
                    "vehicle_id": {
                        "type": "string",
                        "analyzer": "analyzer-name"
                    }
                }
            }
        },
        "number_of_shards": "1",
        "creation_date": "1474525784322",
        "analysis": {
            "analyzer": {
                "analyzer-name": {
                    "filter": "lowercase",
                    "type": "custom",
                    "tokenizer": "keyword"
                }
            }
        },
        "number_of_replicas": "1",
        "uuid": "Thj63TjXTMG4F8piliIP-g",
        "version": {
            "created": "2030599"
        }
    }
},
"mappings": {
    "sensorinfo": {
        "properties": {
            "creation_time": {
                "format": "strict_date_optional_time||epoch_millis",
                "type": "date"
            },
            "timestamp_hour": {
                "format": "strict_date_optional_time||epoch_millis",
                "type": "date"
            },
            "device_id": {
                "type": "string"
            },
            "location": {
                "type": "string"
            },
            "engine_rpm": {
                "type": "double"
            },
            "vehicle_id": {
                "type": "string"
            }
        }
    }
},
"aliases": [ ]

}


(James Baiera) #7

@Pavan_Penhandra, are you sure you are creating the index mappings correctly? It looks like you are inserting the mappings section into the settings block and it's being set as a setting instead of being recognized as a mapping. Can you create the index like so?

curl -XPUT localhost:9200/iot_location -d '                                                      
{
    "settings": {
        "index": {
            "number_of_shards": "1",
            "number_of_replicas": "1",
                "analysis": {
                    "analyzer": {
                        "analyzer-name": {
                        "filter": "lowercase",
                        "type": "custom",
                        "tokenizer": "keyword"
                    }
                }
            }
        }
    },
    "mappings": {
        "sensorinfo": {
            "properties": {
                "creation_time": {
                    "format": "strict_date_optional_time||epoch_millis",
                    "type": "date"
                },
                "location": {
                    "type": "geo_point"
                },
                "engine_rpm": {
                    "type": "float"
                },
                "timestamp_hour": {
                    "format": "strict_date_optional_time||epoch_millis",
                    "type": "date"
                },
                "device_id": {
                    "type": "string",
                    "analyzer": "analyzer-name"
                },
                "vehicle_id": {
                    "type": "string",
                    "analyzer": "analyzer-name"
                }
            }
        }
    }
}'

(Pavan Penhandra) #8

@james.baiera yes you are correct .I tried with the above index and it worked .Mapping worked as expected.
Thank you very much :slight_smile:


(James Baiera) #9

@Pavan_Penhandra Cheers!


(system) #10