java.lang.IllegalStateException: Failed to close the XContentBuilder (Elasticsearch 6.8.1)

Hi, I am consuming data from Apache Kafka and transforming it with Apache Flink, and then inserting it in Elasticsearch. But I get the following errors and my flink job fails. Please can someone tell me why am I facing this issue ? When does this error occur? How can I handle this ?
Thanks in advance.

Kafka Version - 1.0
Flink Version - 1.6.3
Elasticsearch Version - 6.8.1

ES-Connector code of flink -
ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder(
httpHosts,
new ElasticsearchSinkFunction() {
public IndexRequest createIndexRequest(String element) {
try{
return Requests.indexRequest()
.index(indexName)
.type("_doc")
.source(element,XContentType.JSON);
}
catch(Exception e){
System.out.println("Exception in Creating Elasticsearch Index Request :: " + e.getMessage());
e.printStackTrace();
return null;
}
}
@Override
public void process(String element, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
try {
if (!element.trim().equalsIgnoreCase("") && requestIndexer != null) {
requestIndexer.add(createIndexRequest(element));
}
}
catch(Exception e){
System.out.println("Exception in adding documents..");
e.printStackTrace();
}
}
}
);

Error -
java.lang.RuntimeException: An error occurred in ElasticsearchSink.
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:379)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:304)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input in VALUE_STRING
at [Source: UNKNOWN; line: 1, column: 2373]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:458)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:454)
at com.fasterxml.jackson.core.base.ParserBase.loadMoreGuaranteed(ParserBase.java:507)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2453)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString(UTF8StreamJsonParser.java:2400)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getTextCharacters(UTF8StreamJsonParser.java:443)
at com.fasterxml.jackson.core.JsonGenerator.copyCurrentEvent(JsonGenerator.java:1546)
at com.fasterxml.jackson.core.JsonGenerator.copyCurrentStructure(JsonGenerator.java:1652)
at com.fasterxml.jackson.core.JsonGenerator.copyCurrentStructure(JsonGenerator.java:1640)
at org.elasticsearch.common.xcontent.json.JsonXContentGenerator.copyCurrentStructure(JsonXContentGenerator.java:393)
at org.elasticsearch.common.xcontent.XContentBuilder.copyCurrentStructure(XContentBuilder.java:910)
at org.elasticsearch.client.Request.bulk(Request.java:371)
at org.elasticsearch.client.RestHighLevelClient.performRequestAsync(RestHighLevelClient.java:564)
at org.elasticsearch.client.RestHighLevelClient.performRequestAsyncAndParseEntity(RestHighLevelClient.java:549)
at org.elasticsearch.client.RestHighLevelClient.bulkAsync(RestHighLevelClient.java:268)
at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.lambda$createBulkProcessorBuilder$0(Elasticsearch6ApiCallBridge.java:94)
at org.elasticsearch.action.bulk.Retry$RetryHandler.execute(Retry.java:202)
at org.elasticsearch.action.bulk.Retry.withBackoff(Retry.java:62)
at org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:63)
at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339)
at org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:330)
at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:288)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:271)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:267)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:253)
at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6BulkProcessorIndexer.add(Elasticsearch6BulkProcessorIndexer.java:72)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:306)
... 7 more
Suppressed: java.lang.IllegalStateException: Failed to close the XContentBuilder
at org.elasticsearch.common.xcontent.XContentBuilder.close(XContentBuilder.java:924)
at org.elasticsearch.client.Request.$closeResource(Request.java:360)
at org.elasticsearch.client.Request.bulk(Request.java:373)
... 24 more
Caused by: java.io.IOException: Unclosed object or array found
at org.elasticsearch.common.xcontent.json.JsonXContentGenerator.close(JsonXContentGenerator.java:444)
at org.elasticsearch.common.xcontent.XContentBuilder.close(XContentBuilder.java:922)
... 26 more

Hi @aditshrimal7,

I would start out by double checking that the element passed in this line:

.source(element,XContentType.JSON);

is valid json with all curly braces and arrays closed.

Also pay attention to the version of the rest high level client.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.