Mapping Hive Table Fields to Nested Fields in ElasticSearch


(Gowtham Sadasivam) #1

I'm trying to map a field from Hive Table to a Nested Elastic Search Field. Not sure how to achieve that. Here is what my configuration looks like,

(1) HIVE TABLE WITH ES SerDe:
CREATE EXTERNAL TABLE logs ( id string, clientid string, lat string, lon string, adid string, istdt string, event string, istts bigint, creative_size string, app_id string, date string, hour string, exchangename string, logtype string ) ROW FORMAT SERDE 'org.elasticsearch.hadoop.hive.EsSerDe' STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES ( 'es.nodes'='10.10.14.122', 'es.mapping.names' = **'date:@timestamp, lat:geopoint.lat, lon:geopoint.lon'**, 'es.resource'='{exchangename}-{date:YYYY.MM.dd}/{logtype}' );

^ You can see I'm mapping date field to @timestamp (Which is working perfect) and mapping lat field to geopoint.lat field & lon field to geopoint.lon (Which is NOT working). I got the error "." Character not allowed in field name.
Which I'm aware already that "." is not allowed in field name. All I'm trying is to map to the nested field geopoint{"lat":xxxxx,"lon":xxxxx}.

I even tried the Logstash way of specifying the nested fields, (i.e) lat:[geopoint][lat], lon:[geopoint][lon]. But that ended up created new fields as "[geopoint][lat]" & "[geopoint][lon]" (Literally with square brackets).

I wanted to know how to map a hive table field to a nested elastic search field. Any help/suggestion is appreciated. Thanks :slight_smile:


How to read JSON files stored in HDFS via Logstash
(Costin Leau) #2

The table definitions is quite hard to read.
Geopoints are "special" in that the lang and long are not actual fields but rather ways to express the geolocation. Internally ES might or could use a different structure. An alternative is to simply declare a geo object in Hive and then break down the value it receives from ES in Hive.
Since Hive doesn't have first class support for Geo, it's all about experimenting with a mapping that works.
What version of ES and ES-Hadoop are you using?


(Gowtham Sadasivam) #3

Yes, I understood that point completely.

But, Didn't understand these lines,

Let me try to explain the problem again. Here, I'm trying to push my Hive table data to ES. The point where I'm stuck is I couldn't MAP the Hive table value of Lat, Lon to a Nested Field in ES. Just needed the help with this particular mapping. As I mentioned earlier I have Lat and Lon as separate fields in Hive table and trying to map it to the field geopoint.lat and geopoint.lon [where geopoint field is geopoint json object type]. Since [dot] character is NOT allowed in the field name I couldn't able to map the nested fields in ES to Hive table fields. I hope you understood the problem.

ElasticSearch Version: 2.3.1
ES-Hadoop Version is : v2.3.0


(Gowtham Sadasivam) #4

Hello again,

I just upgraded to ElasticStack 5 and I'm still not able to solve this problem.

Since ESv5 brings back the 'DOT' support in field names I just moved a step ahead in the progress of mitigating this mapping issue.

Now, I can able to map the field names with 'DOT' via elasticsearch 'es.mapping.names'. But unfortunately, I can't typecast the field from 'object' to 'geo_point' data type. i.e., trying to map an object field geo_custom{"lat":"xx.xxx","lon":"yy.yyy"} to elasticsearch is throwing an error. Let me try to explain the issue again with some more details, maybe you or someone could help me in this.

Creating Table in Hive:

CREATE EXTERNAL TABLE IF NOT EXISTS test (
            geo_lat string,
            geo_lon string,
            date string
            )
            ROW FORMAT SERDE 'org.elasticsearch.hadoop.hive.EsSerDe'
            STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
            TBLPROPERTIES (
                'es.nodes'='xx.xxx.xxx.xx',
                'es.mapping.names' = 'date:@timestamp, geo_lat:geo_custom.lat, geo_lon:geo_custom.lon',
                'es.resource'='xxxxx-{date:YYYY.MM.dd}'
            );

^ Here I'm have my lat and lon as separate fields and mapping them into a new custom field named "geo_custom" as geo_point object as mentioned here.

ElasticSearch Template:

{
      "template":"xxxxx-*",
      "settings":{
        "index":{
          "mapping.ignore_malformed":true,
          "number_of_shards":5,
          "refresh_interval":"-1"
        }
      },
      "mappings":{
        "Yyyyyy":{
          "properties":{
            "geo_lat":{
              "type":"keyword"
            },
            "geo_lon":{
              "type":"keyword"
            },
            "geo_custom":{
              "type":"geo_point"
            },
            "date":{
              "type":"date",
              "format":"yyyy-MM-dd"
            }
          }
        }
      }
    }

^ Here I'm creating my template specifying "geo_custom" as type of "geo_point".

Hive Query:

INSERT INTO TABLE test SELECT geo['lat'],geo['lon'],date FROM source_table WHERE date='2016-11-02';

^ Hive query to insert data from HDFS to ElasticSearch table. Executing this query fails and ends with the following errors.

Error Logged Hive Console:

Error: java.lang.RuntimeException: Hive Runtime Error while closing operators: Found unrecoverable error [xx.xxx.xxx.xx:9200] returned Bad Request(400) - Could not dynamically add mapping for field [geo_custom.lat]. Existing mapping for [geo_custom] must be of type object but found [geo_point].; Bailing out..
    	at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.close(ExecReducer.java:295)
    	at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:453)
    	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
    	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at javax.security.auth.Subject.doAs(Subject.java:422)
    	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
    	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
    Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [xx.xxx.xxx.xx:9200] returned Bad Request(400) - Could not dynamically add mapping for field [geo_custom.lat]. Existing mapping for [geo_custom] must be of type object but found [geo_point].; Bailing out..
    	... 7 more

Error Logged in ElastcSearch Log:

[2016-11-11T17:15:02,172][DEBUG][o.e.a.b.TransportShardBulkAction] [test-es03] [xxxxx-2016.11.02][3] failed to execute bulk item (index) index {[xxxxx-2016.11.02][Yyyyy][AVhTNIpmSQciAFrftxGY], source[{"geo_custom.lat":"18.73793","geo_custom.lon":"128.922"}]}
org.elasticsearch.index.mapper.MapperParsingException: Could not dynamically add mapping for field [geo_custom.lat]. Existing mapping for [geo_custom] must be of type object but found [geo_point].
	at org.elasticsearch.index.mapper.DocumentParser.getDynamicParentMapper(DocumentParser.java:843) ~[elasticsearch-5.0.0.jar:5.0.0]
	at org.elasticsearch.index.mapper.DocumentParser.parseValue(DocumentParser.java:569) ~[elasticsearch-5.0.0.jar:5.0.0]
	at......
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102]

Basically this shows that, I can't ingest/map a data type of "object" into a data type of "geo_point". The point is Hive doesn't know what's a "geo_point" data type. How to solve this issue? Any suggestion / alternate ingestion method would be greatly appreciated.

Thanks :slight_smile:


(Rahul ) #5

I am also trying to map latitude and longitude from hive table to elasticsearch . But as mentioned in earlier post hive doesnt support geo_point data type. Surfing on internet i fouind that we need to map hive table columns to elastic search while creating hive table using 'es.mapping.names' .

I tried to map hive columns to elastic search geo_point but unable to properly mapp it ..

Following is hive table description :
> create table latlong_demo(id int,lat string,long string)
> row format delimited
> fields terminated by ',';

Following is mapping of hive table to elastic search :

CREATE EXTERNAL TABLE elastic_latlong_demo2 (key bigint,lat string,long string)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource'='latlong_demo/record','es.nodes'='xxx.xxx.xxx.xxx','es.port'= 'xxxx','es.mapping.names'='lat:geopoint.lat,long:geopoint.lon);

So kindly, suggest some solution in order to get latitude, longitude string data type from hive table into elastic search geo_point data type..


(Rahul ) #6

Solution :

I resolved this issue as follows :

|1.1 First create index on elasticsearch node then proceed to hive and create external table hive_to_elastic_geo_table|

curl -XPOST xx.xxx.x.xxx:xxxx/my_index -d '{
"settings" : {
"number_of_shards" : 1
},
"mappings" : {
"my_type" : {
"properties" : {
"key" : { "type" : "string" },
"location" : {
"type" : "geo_point"
}
}
}
}
}'

Follow the following steps on hive client or hive node :
1.1 create text file that stores the data of location :expressionless:

nano geoloc_deatils.txt

1 29.72156792, 95.53731703
2 43.7167, 10.3833

format of the above file
id <\t> "latitude, longitude" <\n>

|1.2 command to create hive table :expressionless:

CREATE TABLE IF NOT EXISTS geoloc_table ( id int, location String)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;

|1.3 command to load data into hive table from text file|

load data local inpath '/path/to/geoloc_details.txt' into geoloc_table;

|1.4 command to create external table that is responsible to index data from hive to elastic search :expressionless:

CREATE EXTERNAL TABLE hive_to_elastic_geo_table (id int, location string)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource'='my_index/my_type','es.nodes'='xx.xxx.x.xxx','es.port'='9200','es.mapping.id'='id');

|1.5 command to insert data from hive table to external table that is in sync with elastic search|

INSERT OVERWRITE TABLE hive_to_elastic_geo_table SELECT * FROM geoloc_table;

Thank you.


(Kulasangar Gowrisangar) #7

@hadoopresearcher I've got a small concern on this.

I tried the steps you've mentioned, but then when I tried to do the last one:[quote="hadoopresearcher, post:6, topic:47389"]
INSERT OVERWRITE TABLE hive_to_elastic_geo_table SELECT * FROM geoloc_table;
[/quote]

It starts as a job, and it freezes right there. It displays:

Number of reduce tasks is set to 0 since there's no reduce operator and pauses there.

Is there anything I should be doing apart from these?

Thanks!


(Rahul ) #8

What is hive using in backend ?
Check if hive is working properly and its metastore or hive server2 working properly or not.
Check if hadoop and elasticsearch cluster is running properly or not ?

How many nodes hadoop cluster are you using for hive backend ?

is simple hive select query runing on the syatem which is as follows :
Try this query "select count(*) from geoloc_table; "

And let me know if its working on your system or not.


(Kulasangar Gowrisangar) #9

@hadoopresearcher thank you so much for the quick response.[quote="hadoopresearcher, post:8, topic:47389"]
What is hive using in backend ?
[/quote]

in terms of? I didn't get this?[quote="hadoopresearcher, post:8, topic:47389"]
Check if hive is working properly and its metastore or hive server2 working properly or not.
[/quote]

Yes both of them are up.[quote="hadoopresearcher, post:8, topic:47389"]
Check if hadoop and elasticsearch cluster is running properly or not ?
[/quote]

Yes both of them are running properly.[quote="hadoopresearcher, post:8, topic:47389"]
How many nodes hadoop cluster are you using for hive backend ?
[/quote]

I'm using a single node.

This works for me. Even I could see the data which I have.

But the problem is, when I try to overwrite data from geoloc_table to hive_to_elastic_geo_table.

Thanks again!


(Rahul ) #10

in terms of? I didn't get this?
If it is using hadoop then check whether nodemanager, datanode, resource manager and namenode are up.

This works for me. Even I could see the data which I have.

Use the aggregate function count in the query is that giving appropriate count ? i am not asking about simple select query. following is the query.
" select count(*) from geoloc_tables; "

what is the engine that hive is using in backend ?
check in hive-site.xml file.
their must be some property named "engine"

Kindly forward the hive logs, post running the insert overwrite query in hive.


(Kulasangar Gowrisangar) #11

@hadoopresearcher thank you so much again. [quote="hadoopresearcher, post:10, topic:47389"]
If it is using hadoop then check whether nodemanager, datanode, resource manager and namenode are up.
[/quote]

Yes they're up when I typed in jps.I could see the processes.[quote="hadoopresearcher, post:10, topic:47389"]
" select count(*) from geoloc_tables; "
[/quote]

This doesn't work apparently. :disappointed: And it takes such a long time, and gives this exception.

I checked in the xml, but then there's no any property called engine. What I've got there is:

<configuration>
   <property>
      <name>javax.jdo.option.ConnectionURL</name>
      <value>jdbc:mysql://localhost:3306/metastore?useSSL=false</value>
      <description>metadata is stored in a MySQL server</description>
   </property>
   <property>
      <name>javax.jdo.option.ConnectionDriverName</name>
      <value>com.mysql.jdbc.Driver</value>
      <description>MySQL JDBC driver class</description>
   </property>
   <property>
      <name>javax.jdo.option.ConnectionUserName</name>
      <value>root</value>
      <description>user name for connecting to mysql server</description>
   </property>
   <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>root</value>
      <description>password for connecting to mysql server</description>
   </property>
  <property>
      <name>hive.aux.jars.path</name>
      <value>/pathto/elasticsearch-hadoop-5.4.1.jar</value>
      <description>A comma separated list (with no spaces) of the jar files</description>
  </property>
</configuration>

I'm sorry, I'm still a noob around this. How do I get the logs?

Thanks again!


(Rahul ) #12

hey there is no need to sorry and feel bad about being noob...Every one starts from being noob that is the reason these blogs are being brought into use to discuss and make the difficult job an easy process where we can collaboratively seek solution.

  • Secondly, the location of the logs will be mentioned in some of the xml files.

  • hive.aux.jars.path in this property kindly mention appropriate path of the elasticsearch jar file.

  • Connection refused means either firewall is blocking the access required. so you need to turn off the firewall.

  • Else resource manager or node manager is not running. Because 8032 is the port on which nodemanager or resource manager process runs.

Thanks and feel free to ping as many times you want with whatever doubt you may have..Dont stop till you get the solution cheers.


(Kulasangar Gowrisangar) #13

@hadoopresearcher thank you so much for the guidance and I really appreciate that. :+1:

umm well I'm actually printing out the log in the console itself once I hit the select count(*) from geoloc_tables; I've copy pasted the whole log after the execution here.[quote="hadoopresearcher, post:12, topic:47389"]
hive.aux.jars.path in this property kindly mention appropriate path of the elasticsearch jar file.
[/quote]

Ah I've actually mentioned the actual path, but when i pasted here I just mentioned it as some path due to privacy. :smile:[quote="hadoopresearcher, post:12, topic:47389"]
Connection refused means either firewall is blocking the access required. so you need to turn off the firewall.
[/quote]

I'm actually running Hadoop and Hive on a Linux server. So do i have to turn it off there?

But then when I type in jps in the terminal I can see these:

5748 Jps
5222 ResourceManager
32343 RunJar
5575 NodeManager
5034 SecondaryNameNode
2795 RunJar
4795 DataNode
14908 Elasticsearch
3103 RunJar

Thank you so much again, for the effort you're putting in.


(Rahul ) #14
  • Yes, you need to disable firewall on which you are running hadoop and hive. But that is not
    advisable if you are in production mode. Experimentations are allowed in development mode
    only.
  • Secondly Namenode is not running. So make it run.

(Kulasangar Gowrisangar) #15

@hadoopresearcher Thank you again.

I'm doing this in my dev env, and the firewall was disabled by default. But still no luck :pensive:[quote="hadoopresearcher, post:14, topic:47389"]
Secondly Namenode is not running. So make it run.
[/quote]

It's running right as per the reply in my previous answer?


(翟玉勇) #16

create table student_test
(
id int,
info struct<lat:double, lon:double>
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY ':'
stored as textfile
;
//data 11111,1.2:2.3
LOAD DATA LOCAL INPATH '/home/master/test5.txt' INTO TABLE student_test; //

CREATE EXTERNAL TABLE IF NOT EXISTS temp.zyy_es_point (
id bigint,
shop_location struct<lat:double, lon:double>
)
ROW FORMAT SERDE 'org.elasticsearch.hadoop.hive.EsSerDe'
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES (
'es.mapping.id'='id',
'es.nodes'='127.0.0.1',
'es.resource'='prophet/shop'
);

insert into table temp.zyy_es_point select id,info from student_test;


(Rahul ) #17

Yes as you correctly mentioned., I went through the previous post..I did saw that secondary namenode is running. Well to the best of my knowledge, There are two types, when it comes to high availability feature of hadoop.

  1. Active Transfer ( two active namenodes ( one active and other active standby ) and two active resource managers ( one active and other active standby ))
  2. Passive Transfer ( One active namenode and other backup or secondary namenode )

In the above scenario on your system :

Secondary Namenode is running.
And active Namenode is not running.

There is a difference between Secondary namenode and active namenode.

So make sure that active namenode is running.

Check the logs of hadoop. To ensure the namenode is running.

There should be 5 basic services running if you are using start-all.sh to start hadoop cluster after using command jps :

HDFS : (start-dfs.sh)

  1. Namenode ( Leader )
  2. Secondary Namenode ( Back up service )
  3. Datanode ( Slave )

Processing system for HDFS : ( start-yarn.sh )
4. Resource Manager ( Leader )
5. Nodemanager ( Slave )