We have an index named "partylogins" which we are trying to bulk load using
the Java API. The index has 4 shards and no replicas during our bulk load
process. However, all documents being indexed are being saved in shard 3
of the index. The _routing field is not setup on this index and we aren't
using any aliases.
The code we are using for bulk loading the documents is:
private void saveSpecRecords(final String index, final List<Record>records
) throws Exception {
Client client = null;
BulkRequestBuilder bulkRequest = null;
long size = 0l;
client = getClient();
bulkRequest = client.prepareBulk();
bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE);
bulkRequest.setRefresh(DEFAULT_REFRESH);
bulkRequest.setReplicationType(ReplicationType.ASYNC);
bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE);
bulkRequest.setReplicationType(ReplicationType.ASYNC);
bulkRequest.setRefresh(false);
for (Record record : records) {
Map<String, Object> source_fields = record.getSource();
IndexRequestBuilder doc = client.prepareIndex(index,DEFAULT_DOC_TYPE
, record.getId());
doc.setConsistencyLevel(WriteConsistencyLevel.ONE);
doc.setRefresh(false); // DEFAULT_REFRESH
//doc.setCreate(true); // DEFAULT_CREATE
doc.setReplicationType(ReplicationType.ASYNC);
// Remove the id field from the source and set the source
source_fields.remove(XmlReader.ID_FIELD);
doc.setSource(source_fields);
logger.debug("record id: " + record.getId());
//logger.debug("source keys: " + source_fields.keySet());
// Add the size up to average it out later
size += record.getJsonString().length();
bulkRequest.add(doc);
}
logger.debug("record source size avg (Kbs): " + (size / records.size
() / 1024));
ListenableActionFuture<BulkResponse> lbr = bulkRequest.execute();
logger.debug("start actionGet() for BulkResponse");
BulkResponse response = lbr.actionGet();
logger.debug("end actionGet() for BulkResponse");
if (response.hasFailures()) {
logger.error("found the following errors while processing a
bulk insert");
for (BulkItemResponse item : response.items()) {
if (item.failed()) {
logger.debug(item.getIndex());
logger.debug(item.getId());
logger.error(item.failureMessage());
}
}
throw new Exception("error bulk copying data into spec index " +index
);
}
}
What am I missing? Please help if you have an idea of why all of the
documents end up on 1 shard.
--