hi @all
when i use flink(1.14.4) write data to es 8.6.2 use BulkOperation ,then something is wrong. Cannot get 'Index' variant: current variant is 'Update' and Missing required property 'BulkRequest.operations'. I don't know how to resolve. I use update, because I want to update the data according to script if it exists, and upsert if it does not.
try {
BulkRequest request = bulkRequest.build();//构建operation时候有异常 missing异常
List<BulkOperation> operations = request.operations();
checkpointState.clear();
checkpointState.addAll(operations);
BulkResponse result = esClient.bulk(request);
if (result != null && result.errors()) {
contextRetryNum--;
handlePartialFailures(result);
}
System.out.println("执行完毕");
break;
} catch (ElasticsearchException ese) {
LOG.error("write es failed:" + ese.getMessage());
contextRetryNum = 0;
} catch (MissingRequiredPropertyException missError) {
//此异常证明没有操作需要进行 //超时异常也不管
LOG.error("es sink error:{}", missError.getMessage());
contextRetryNum = 0;
} catch (Exception ex) {
// 等待一段时间后重试
LOG.error("important error:{}", ex.getMessage());
Thread.sleep(context.getSleepTime());
}
(BulkOperation.Builder) op
.update(up -> up.id(esId)
.index(indexName)
.retryOnConflict(3)
.action(ac -> ac.script(script -> script.inline(inLineScript -> inLineScript.source(sjson.getString("source"))
.params(params).lang("painless"))).scriptedUpsert(true)
.upsert(value.getJSONObject("upsert"))
));