Hi,
We are trying to ingest data from Hive to Elasticsearch, Had issues ingesting the data and few of them are malformed json records
Followed below documentation for handling bad records and created DDL script to ingest data
Using Elastic search version 6.8.0
hive> select * from provider1;
OK
{"id","k11",}
{"id","k12",}
{"id","k13",}
{"id","k14",}
{"id":"K1","name":"Ravi","salary":500}
{"id":"K2","name":"Ravi","salary":500}
{"id":"K3","name":"Ravi","salary":500}
{"id":"K4","name":"Ravi","salary":500}
{"id":"K5","name":"Ravi","salary":500}
{"id":"K6","name":"Ravi","salary":"sdfgg"}
{"id":"K7","name":"Ravi","salary":"sdf"}
{"id":"k8"}
{"id":"K9","name":"r1","salary":522}
{"id":"k10","name":"r2","salary":53}
Time taken: 0.179 seconds, Fetched: 14 row(s)
ADD JAR /home/smrafi/elasticsearch-hadoop-6.8.0/dist/elasticsearch-hadoop-6.8.0.jar;
CREATE external TABLE hive_es_with_handler( data STRING)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'test_eshadoop/healthCareProvider','es.nodes' = 'xyz','es.input.json' = 'yes','es.index.auto.create' = 'true','es.write.operation'='upsert',
'es.nodes.wan.only' = 'true','es.port' = '443','es.net.ssl'='true','es.batch.size.entries'='1','es.mapping.id' ='id','es.batch.write.retry.count'='-1',
'es.batch.write.retry.wait'='60s',
'es.write.rest.error.handlers' = 'es, ignoreBadRecords',
'es.write.data.error.handlers' = 'customLog',
'es.write.data.error.handler.customLog' = 'com.xyz.elshandler.CustomLogOnError',
'es.write.rest.error.handler.es.client.resource'="error_es_index/error",
'es.write.rest.error.handler.es.return.default'='HANDLED',
'es.write.rest.error.handler.log.logger.name' = 'BulkErrors',
'es.write.data.error.handler.log.logger.name' = 'SerializationErrors',
'es.write.rest.error.handler.ignoreBadRecords' = 'com.verisys.elshandler.IgnoreBadRecordHandler',
'es.write.rest.error.handler.es.return.error'='HANDLED');
insert into hive_es_with_handler10 select * from provider1;
Below is exception trace, it failed complaining the error.handler index is not present
Caused by: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: org.codehaus.jackson.JsonParseException: Unexpected character (',' (code 44)): was expecting a colon to separate field name and value
at [Source: [B@1e3f0aea; line: 1, column: 7]
at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:95)
at org.elasticsearch.hadoop.serialization.ParsingUtils.doFind(ParsingUtils.java:168)
at org.elasticsearch.hadoop.serialization.ParsingUtils.values(ParsingUtils.java:151)
at org.elasticsearch.hadoop.serialization.field.JsonFieldExtractors.process(JsonFieldExtractors.java:213)
at org.elasticsearch.hadoop.serialization.bulk.JsonTemplatedBulk.preProcess(JsonTemplatedBulk.java:64)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:54)
at org.elasticsearch.hadoop.hive.EsSerDe.serialize(EsSerDe.java:171)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:725)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:897)
at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:897)
at org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:130)
at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:148)
at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:550)
... 9 more
Caused by: org.codehaus.jackson.JsonParseException: Unexpected character (',' (code 44)): was expecting a colon to separate field name and value
at [Source: [B@1e3f0aea; line: 1, column: 7]
at org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1433)
at org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:521)
at org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(JsonParserMinimalBase.java:442)
at org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.java:500)
at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:93)
... 22 more
I tried to use the custom SerializationErrorHandler But it is of no use and Handler is not coming into context, Its completely stopping the job instead of continuing for the good records even After having default (HANDLED as the constant)