Hi, I am consuming data from Apache Kafka and transforming it with Apache Flink, and then inserting it in Elasticsearch. But I get the following errors and my flink job fails. Please can someone tell me why am I facing this issue ? When does this error occur? How can I handle this ?
Thanks in advance.
Kafka Version - 1.0
Flink Version - 1.6.3
Elasticsearch Version - 6.8.1
ES-Connector code of flink -
ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder(
httpHosts,
new ElasticsearchSinkFunction() {
public IndexRequest createIndexRequest(String element) {
try{
return Requests.indexRequest()
.index(indexName)
.type("_doc")
.source(element,XContentType.JSON);
}
catch(Exception e){
System.out.println("Exception in Creating Elasticsearch Index Request :: " + e.getMessage());
e.printStackTrace();
return null;
}
}
@Override
public void process(String element, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
try {
if (!element.trim().equalsIgnoreCase("") && requestIndexer != null) {
requestIndexer.add(createIndexRequest(element));
}
}
catch(Exception e){
System.out.println("Exception in adding documents..");
e.printStackTrace();
}
}
}
);
Error -
java.lang.RuntimeException: An error occurred in ElasticsearchSink.
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:379)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:304)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input in VALUE_STRING
at [Source: UNKNOWN; line: 1, column: 2373]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:458)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:454)
at com.fasterxml.jackson.core.base.ParserBase.loadMoreGuaranteed(ParserBase.java:507)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2453)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString(UTF8StreamJsonParser.java:2400)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getTextCharacters(UTF8StreamJsonParser.java:443)
at com.fasterxml.jackson.core.JsonGenerator.copyCurrentEvent(JsonGenerator.java:1546)
at com.fasterxml.jackson.core.JsonGenerator.copyCurrentStructure(JsonGenerator.java:1652)
at com.fasterxml.jackson.core.JsonGenerator.copyCurrentStructure(JsonGenerator.java:1640)
at org.elasticsearch.common.xcontent.json.JsonXContentGenerator.copyCurrentStructure(JsonXContentGenerator.java:393)
at org.elasticsearch.common.xcontent.XContentBuilder.copyCurrentStructure(XContentBuilder.java:910)
at org.elasticsearch.client.Request.bulk(Request.java:371)
at org.elasticsearch.client.RestHighLevelClient.performRequestAsync(RestHighLevelClient.java:564)
at org.elasticsearch.client.RestHighLevelClient.performRequestAsyncAndParseEntity(RestHighLevelClient.java:549)
at org.elasticsearch.client.RestHighLevelClient.bulkAsync(RestHighLevelClient.java:268)
at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.lambda$createBulkProcessorBuilder$0(Elasticsearch6ApiCallBridge.java:94)
at org.elasticsearch.action.bulk.Retry$RetryHandler.execute(Retry.java:202)
at org.elasticsearch.action.bulk.Retry.withBackoff(Retry.java:62)
at org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:63)
at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339)
at org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:330)
at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:288)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:271)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:267)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:253)
at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6BulkProcessorIndexer.add(Elasticsearch6BulkProcessorIndexer.java:72)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:306)
... 7 more
Suppressed: java.lang.IllegalStateException: Failed to close the XContentBuilder
at org.elasticsearch.common.xcontent.XContentBuilder.close(XContentBuilder.java:924)
at org.elasticsearch.client.Request.$closeResource(Request.java:360)
at org.elasticsearch.client.Request.bulk(Request.java:373)
... 24 more
Caused by: java.io.IOException: Unclosed object or array found
at org.elasticsearch.common.xcontent.json.JsonXContentGenerator.close(JsonXContentGenerator.java:444)
at org.elasticsearch.common.xcontent.XContentBuilder.close(XContentBuilder.java:922)
... 26 more