The BulkProcessor class many times is calling our overridden "afterBulk" failure method we defined.
Here is our BulkProcessor Listener() code:
if (bulkProcessor == null) {
bulkProcessor = BulkProcessor.builder(
rhlClient::bulkAsync,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) {
LOG.debug("some significant message here")
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) {
LOG.debug("some significant message here")
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) {
LOG.error("THIS OVERRIDE KEEPS GETTING CALLLED")
}
})
.setBulkActions(1000)
.setBulkSize(new ByteSizeValue(100, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.build();
}
This is the message we are seeing:
Duplicate field 'latlon'
at [Source: (org.elasticsearch.common.bytes.BytesReference$MarkSupportingStreamInputWrapper); line: 1, column: 1201]
Here is our XcontentBulder definition we use to set up our index:
XContentBuilder builder = jsonBuilder()
.startObject()
.startObject(RESOURCE_TYPE)
.startObject("properties")
.startObject("**latlon**").field("type", "geo_point").endObject()
.startObject("path").field("type", "text").field("analyzer", "path-analyzer")
.field("fielddata", true).endObject()
.startObject("emailString").field("type", "text").field("analyzer", "url-email-
analyzer").endObject()
.startObject("fullPath").field("type", "keyword").field("index", "true").endObject()
.startObject("statesKW").field("type", "text")
.field("analyzer", "keyword").field("fielddata", true).endObject()
.startObject("audiences").field("type", "text")
.field("analyzer", "keyword").field("fielddata", true).endObject()
.startObject("orgTypes").field("type", "text").field("analyzer", "keyword")
.field("fielddata", true).endObject()
.startObject("suggest").field("type", "completion").endObject()
.startObject("linkUrl").field("type", "text").field("analyzer",
"domain_name_analyzer").endObject()
.startObject("linkText").field("type", "text").field("analyzer",
"domain_name_analyzer").endObject()
.startObject("details").field("type", "text").field("analyzer",
"domain_name_analyzer").endObject()
.startObject("legalName").field("type", "text").endObject()
.endObject()
.endObject()
.endObject();
Now in our code that does the index we have this XContentBuilder definition:
XContentBuilder builder = jsonBuilder()
.startObject()
.field("id", Long.toString(resource.getId()))
.field("linkText", resource.getLinkText())
.field("linkUrl", resource.getLinkUrl())
.field("domain", resource.getDomain())
.field("creatorId", resource.getCreatorId())
.field("createDate",
null == resource.getCreateDate()? null : sdf.format(resource.getCreateDate()))
.field("startDate",
null == resource.getStartDate()? null : sdf.format(resource.getStartDate()))
.field("endDate",
null == resource.getEndDate()? null : sdf.format(resource.getEndDate()))
.field("markedNewEndDate",
null == resource.getMarkedNewEndDate()? null :
sdf.format(resource.getMarkedNewEndDate()))
.field("organizationType", resource.getOrganizationType())
.field("mediaItem", resource.getMediaItem())
.field("isPublic", resource.getIsPublic())
.field("isArchived", resource.getIsArchived())
.field("details", resource.getDetails())
.field("emailString", resource.getEmailsAsString())
.field("ein", resource.getEin())
.field("lastUpdate", null == lastUpdate? null :sdf.format(lastUpdate))
.field("lastRevetDate", null == lastRevetDate? null :sdf.format(lastRevetDate))
.field("isLegalNameFromTitle", resource.getIsLegalNameFromTitle())
.field("legalName", resource.getLegalName());
suggestionBuilder.add(resource.getDetails()).add(resource.getLinkText());
...and further into the indexing code we build an array for each "location" as follows:
locations.stream().forEach(location -> {
try {
builder.startArray("**latlon**");
builder.startObject();
builder.field("lat", location.getLatitude());
builder.field("lon", location.getLongitude());
...
other necessary code here
...
builder.endObject();
builder.endArray();
} catch (IOException e) {
LOG.info(e);
}
});
Could this "Duplicate field 'latlon' After bulk fail message be generated because we have more than one "latlon" array that needs to be defined?
FYI: We were not seeing this in ES Version 5.
Can somebody please advise?
Thank you
Gary