HIVE-Elasticsearch Error Handler for Malformed records

Hi,

We are trying to ingest data from Hive to Elasticsearch, Had issues ingesting the data and few of them are malformed json records

Followed below documentation for handling bad records and created DDL script to ingest data

Using Elastic search version 6.8.0

hive> select * from provider1;
OK
{"id","k11",}
{"id","k12",}
{"id","k13",}
{"id","k14",}
{"id":"K1","name":"Ravi","salary":500}
{"id":"K2","name":"Ravi","salary":500}
{"id":"K3","name":"Ravi","salary":500}
{"id":"K4","name":"Ravi","salary":500}
{"id":"K5","name":"Ravi","salary":500}
{"id":"K6","name":"Ravi","salary":"sdfgg"}
{"id":"K7","name":"Ravi","salary":"sdf"}
{"id":"k8"}
{"id":"K9","name":"r1","salary":522}
{"id":"k10","name":"r2","salary":53}
Time taken: 0.179 seconds, Fetched: 14 row(s)

ADD JAR /home/smrafi/elasticsearch-hadoop-6.8.0/dist/elasticsearch-hadoop-6.8.0.jar;
CREATE external TABLE hive_es_with_handler( data STRING)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'test_eshadoop/healthCareProvider','es.nodes' = 'xyz','es.input.json' = 'yes','es.index.auto.create' = 'true','es.write.operation'='upsert',
'es.nodes.wan.only' = 'true','es.port' = '443','es.net.ssl'='true','es.batch.size.entries'='1','es.mapping.id' ='id','es.batch.write.retry.count'='-1',
'es.batch.write.retry.wait'='60s',
'es.write.rest.error.handlers' = 'es, ignoreBadRecords',
'es.write.data.error.handlers' = 'customLog',
'es.write.data.error.handler.customLog' = 'com.xyz.elshandler.CustomLogOnError',
'es.write.rest.error.handler.es.client.resource'="error_es_index/error",
'es.write.rest.error.handler.es.return.default'='HANDLED',
'es.write.rest.error.handler.log.logger.name' = 'BulkErrors',
'es.write.data.error.handler.log.logger.name' = 'SerializationErrors',
'es.write.rest.error.handler.ignoreBadRecords' = 'com.verisys.elshandler.IgnoreBadRecordHandler',
'es.write.rest.error.handler.es.return.error'='HANDLED');
insert into hive_es_with_handler10 select * from provider1;

Below is exception trace, it failed complaining the error.handler index is not present

Caused by: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: org.codehaus.jackson.JsonParseException: Unexpected character (',' (code 44)): was expecting a colon to separate field name and value
 at [Source: [B@1e3f0aea; line: 1, column: 7]
	at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:95)
	at org.elasticsearch.hadoop.serialization.ParsingUtils.doFind(ParsingUtils.java:168)
	at org.elasticsearch.hadoop.serialization.ParsingUtils.values(ParsingUtils.java:151)
	at org.elasticsearch.hadoop.serialization.field.JsonFieldExtractors.process(JsonFieldExtractors.java:213)
	at org.elasticsearch.hadoop.serialization.bulk.JsonTemplatedBulk.preProcess(JsonTemplatedBulk.java:64)
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:54)
	at org.elasticsearch.hadoop.hive.EsSerDe.serialize(EsSerDe.java:171)
	at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:725)
	at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:897)
	at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95)
	at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:897)
	at org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:130)
	at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:148)
	at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:550)
	... 9 more
Caused by: org.codehaus.jackson.JsonParseException: Unexpected character (',' (code 44)): was expecting a colon to separate field name and value
 at [Source: [B@1e3f0aea; line: 1, column: 7]
	at org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1433)
	at org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:521)
	at org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(JsonParserMinimalBase.java:442)
	at org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.java:500)
	at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:93)
	... 22 more

I tried to use the custom SerializationErrorHandler But it is of no use and Handler is not coming into context, Its completely stopping the job instead of continuing for the good records even After having default (HANDLED as the constant)

It looks like you are using the aws provided Elasticsearch, you will need to ask them about this as it's a fork that they run and we do not know the changes they have made to the product.

Thanks Mark, Have seen the documentation, it says as below

Serialization Error Handlers

Before sending data to Elasticsearch, elasticsearch-hadoop must serialize each document into a JSON bulk entry. It is during this process that the bulk operation is determined, document metadata is extracted, and integration specific data structures are converted into JSON documents. During this process, inconsistencies with record structure can cause exceptions to be thrown during the serialization process. These errors often lead to failed tasks and halted processing.

Elasticsearch for Apache Hadoop provides an API to handle serialization errors at the record level. Error handlers for serialization are given:

  • The integration specific data structure that was unable to be serialized
  • Exception encountered during serialization

Serialization Error Handlers are not yet available for Hive. Elasticsearch for Apache Hadoop uses Hive’s SerDe constructs to convert data into bulk entries before being sent to the output format. SerDe objects do not have a cleanup method that is called when the object ends its lifecycle. Because of this, we do not support serialization error handlers in Hive as they cannot be closed at the end of the job execution.

And it was working for the wrong types inside document, those errors were properly caught with custom error handlers, I was assuming the malformed json document will be treated the same, checked it was undergoing some pre-process (extracting fields)

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.