ES 6.3.0 Duplicate field message generated in BulkProcessor


(garyM) #1

The BulkProcessor class many times is calling our overridden "afterBulk" failure method we defined.

Here is our BulkProcessor Listener() code:
if (bulkProcessor == null) {

        bulkProcessor = BulkProcessor.builder(
                rhlClient::bulkAsync,
                new BulkProcessor.Listener() {
                    @Override
                    public void beforeBulk(long executionId,
                                           BulkRequest request) {
                         LOG.debug("some significant message here")
                    }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          BulkResponse response) {
                        LOG.debug("some significant message here")
                    }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          Throwable failure) {
                        LOG.error("THIS OVERRIDE KEEPS GETTING CALLLED")

                    }
                })
                .setBulkActions(1000)
                .setBulkSize(new ByteSizeValue(100, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(1)
                .build();
    }

This is the message we are seeing:

Duplicate field 'latlon'
at [Source: (org.elasticsearch.common.bytes.BytesReference$MarkSupportingStreamInputWrapper); line: 1, column: 1201]

Here is our XcontentBulder definition we use to set up our index:

            XContentBuilder builder = jsonBuilder()
                .startObject()
                    .startObject(RESOURCE_TYPE)
                        .startObject("properties")
                            .startObject("**latlon**").field("type", "geo_point").endObject()
                            .startObject("path").field("type", "text").field("analyzer", "path-analyzer")
                                                .field("fielddata", true).endObject()
                            .startObject("emailString").field("type", "text").field("analyzer", "url-email- 
                                    analyzer").endObject()
                            .startObject("fullPath").field("type", "keyword").field("index", "true").endObject()
                            .startObject("statesKW").field("type", "text")
                                                    .field("analyzer", "keyword").field("fielddata", true).endObject()
                            .startObject("audiences").field("type", "text")
                                                        .field("analyzer", "keyword").field("fielddata", true).endObject()
                            .startObject("orgTypes").field("type", "text").field("analyzer", "keyword")
                                                    .field("fielddata", true).endObject()
                            .startObject("suggest").field("type", "completion").endObject()
                            .startObject("linkUrl").field("type", "text").field("analyzer", 
                                  "domain_name_analyzer").endObject()
                            .startObject("linkText").field("type", "text").field("analyzer", 
                                    "domain_name_analyzer").endObject()
                            .startObject("details").field("type", "text").field("analyzer", 
                                 "domain_name_analyzer").endObject()
                            .startObject("legalName").field("type", "text").endObject()
                        .endObject()
                    .endObject()
                .endObject();

Now in our code that does the index we have this XContentBuilder definition:

            XContentBuilder builder = jsonBuilder()
                .startObject()
                .field("id", Long.toString(resource.getId()))
                .field("linkText", resource.getLinkText())
                .field("linkUrl", resource.getLinkUrl())
                .field("domain", resource.getDomain())
                .field("creatorId", resource.getCreatorId())
                .field("createDate",
                        null == resource.getCreateDate()? null : sdf.format(resource.getCreateDate()))
                .field("startDate",
                        null == resource.getStartDate()? null : sdf.format(resource.getStartDate()))
                .field("endDate",
                        null == resource.getEndDate()? null : sdf.format(resource.getEndDate()))
                .field("markedNewEndDate",
                        null == resource.getMarkedNewEndDate()? null : 
                              sdf.format(resource.getMarkedNewEndDate()))
                .field("organizationType", resource.getOrganizationType())
                .field("mediaItem", resource.getMediaItem())
                .field("isPublic", resource.getIsPublic())
                .field("isArchived", resource.getIsArchived())
                .field("details", resource.getDetails())
                .field("emailString", resource.getEmailsAsString())
                .field("ein", resource.getEin())
                .field("lastUpdate", null == lastUpdate? null :sdf.format(lastUpdate))
                .field("lastRevetDate", null == lastRevetDate? null :sdf.format(lastRevetDate))
                .field("isLegalNameFromTitle", resource.getIsLegalNameFromTitle())
                .field("legalName", resource.getLegalName());
        suggestionBuilder.add(resource.getDetails()).add(resource.getLinkText());

...and further into the indexing code we build an array for each "location" as follows:

                locations.stream().forEach(location -> {
                try {
                    builder.startArray("**latlon**");
                    builder.startObject();
                    builder.field("lat", location.getLatitude());
                    builder.field("lon", location.getLongitude());
					...
					other necessary code here
					...
                    builder.endObject();
                    builder.endArray();
                } catch (IOException e) {
                    LOG.info(e);
                }
            });

Could this "Duplicate field 'latlon' After bulk fail message be generated because we have more than one "latlon" array that needs to be defined?

FYI: We were not seeing this in ES Version 5.
Can somebody please advise?
Thank you
Gary


(David Pilato) #2

I think you should write something like:

builder.startArray("**latlon**");
locations.stream().forEach(location -> {
            try {            
                builder.startObject();
                builder.field("lat", location.getLatitude());
                builder.field("lon", location.getLongitude());
				...
				other necessary code here
				...
                builder.endObject();
                
            } catch (IOException e) {
                LOG.info(e);
            }
 builder.endArray();

(garyM) #3

Well thank you again sir. Yes that fixed it. And further analysis uncovered a bug in our code that has been there for awhile.

Now I need to inquire about my log file being slammed with over 100KB of messages from this bulk process which i will address in another posting.

Thanks again.
Gary


(system) #4

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.