Document ALWAYS Routes to ONE Shard when BULK Loading - HELP!


(amos.wood) #1

We have an index named "partylogins" which we are trying to bulk load using
the Java API. The index has 4 shards and no replicas during our bulk load
process. However, all documents being indexed are being saved in shard 3
of the index. The _routing field is not setup on this index and we aren't
using any aliases.

The code we are using for bulk loading the documents is:

private void saveSpecRecords(final String index, final List<Record>records

) throws Exception {
Client client = null;
BulkRequestBuilder bulkRequest = null;
long size = 0l;

    client = getClient();

    bulkRequest = client.prepareBulk();
    bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE);
    bulkRequest.setRefresh(DEFAULT_REFRESH);
    bulkRequest.setReplicationType(ReplicationType.ASYNC);

    bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE);
    bulkRequest.setReplicationType(ReplicationType.ASYNC);
    bulkRequest.setRefresh(false);

    for (Record record : records) {
        Map<String, Object> source_fields = record.getSource();
        IndexRequestBuilder doc = client.prepareIndex(index,DEFAULT_DOC_TYPE

, record.getId());
doc.setConsistencyLevel(WriteConsistencyLevel.ONE);
doc.setRefresh(false); // DEFAULT_REFRESH
//doc.setCreate(true); // DEFAULT_CREATE
doc.setReplicationType(ReplicationType.ASYNC);

        // Remove the id field from the source and set the source
        source_fields.remove(XmlReader.ID_FIELD);
        doc.setSource(source_fields);            
        
        logger.debug("record id: " + record.getId());
        //logger.debug("source keys: " + source_fields.keySet());
        
        // Add the size up to average it out later
        size += record.getJsonString().length();            

        bulkRequest.add(doc);
    }
    
    logger.debug("record source size avg (Kbs): " + (size / records.size

() / 1024));

    ListenableActionFuture<BulkResponse> lbr = bulkRequest.execute();
    logger.debug("start actionGet() for BulkResponse");
    BulkResponse response = lbr.actionGet();
    logger.debug("end actionGet() for BulkResponse");
    if (response.hasFailures()) {
        logger.error("found the following errors while processing a 

bulk insert");
for (BulkItemResponse item : response.items()) {
if (item.failed()) {
logger.debug(item.getIndex());
logger.debug(item.getId());
logger.error(item.failureMessage());
}
}
throw new Exception("error bulk copying data into spec index " +index
);
}
}

What am I missing? Please help if you have an idea of why all of the
documents end up on 1 shard.

--


(amos.wood) #2

Also, I have verified that record.getId() has a UNIQUE Id in each value. A
sample output of my logging looks like:

...

record id: A6855601-7A01-4C20-8F07-AC004BEC415E
record id: 89D5E410-BEFB-461B-958E-0138CE03634E
record id: EBBBF0C1-B049-46E4-9C04-2B23BCCABE88

record source size avg (Kbs): 17

start actionGet() for BulkResponse
end actionGet() for BulkResponse

P.S. I hope that this issue is related to the index speed issue which I am
seeing too. Using this code and the 17Kb documents, I am only able to
INDEX 7 documents/sec which is horribly slow.

--


(David Pilato) #3

I suggest that you execute the bulk request every 100 documents instead of after 17k. That's how rivers are implemented.

Then, try to increase bulk size to 200, 300... And so on.

HTH
David

--
David :wink:
Twitter : @dadoonet / @elasticsearchfr / @scrutmydocs

Le 23 août 2012 à 15:15, "amos.wood" amos.wood@lifeway.com a écrit :

We have an index named "partylogins" which we are trying to bulk load using the Java API. The index has 4 shards and no replicas during our bulk load process. However, all documents being indexed are being saved in shard 3 of the index. The _routing field is not setup on this index and we aren't using any aliases.

The code we are using for bulk loading the documents is:

private void saveSpecRecords(final String index, final List<Record> records) throws Exception {
    Client client = null;
    BulkRequestBuilder bulkRequest = null;
    long size = 0l;        

    client = getClient();

    bulkRequest = client.prepareBulk();
    bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE);
    bulkRequest.setRefresh(DEFAULT_REFRESH);
    bulkRequest.setReplicationType(ReplicationType.ASYNC);

    bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE);
    bulkRequest.setReplicationType(ReplicationType.ASYNC);
    bulkRequest.setRefresh(false);

    for (Record record : records) {
        Map<String, Object> source_fields = record.getSource();
        IndexRequestBuilder doc = client.prepareIndex(index, DEFAULT_DOC_TYPE, record.getId());
        doc.setConsistencyLevel(WriteConsistencyLevel.ONE);
        doc.setRefresh(false); // DEFAULT_REFRESH
        //doc.setCreate(true); // DEFAULT_CREATE
        doc.setReplicationType(ReplicationType.ASYNC);
        
        // Remove the id field from the source and set the source
        source_fields.remove(XmlReader.ID_FIELD);
        doc.setSource(source_fields);            
        
        logger.debug("record id: " + record.getId());
        //logger.debug("source keys: " + source_fields.keySet());
        
        // Add the size up to average it out later
        size += record.getJsonString().length();            

        bulkRequest.add(doc);
    }
    
    logger.debug("record source size avg (Kbs): " + (size / records.size() / 1024));
    
    ListenableActionFuture<BulkResponse> lbr = bulkRequest.execute();
    logger.debug("start actionGet() for BulkResponse");
    BulkResponse response = lbr.actionGet();
    logger.debug("end actionGet() for BulkResponse");
    if (response.hasFailures()) {
        logger.error("found the following errors while processing a bulk insert");
        for (BulkItemResponse item : response.items()) {
            if (item.failed()) {
                logger.debug(item.getIndex());
                logger.debug(item.getId());
                logger.error(item.failureMessage());
            }
        }
        throw new Exception("error bulk copying data into spec index " + index);
    }
}

What am I missing? Please help if you have an idea of why all of the documents end up on 1 shard.

--

--


(Clinton Gormley) #4

On Thu, 2012-08-23 at 06:15 -0700, amos.wood wrote:

We have an index named "partylogins" which we are trying to bulk load
using the Java API. The index has 4 shards and no replicas during our
bulk load process. However, all documents being indexed are being
saved in shard 3 of the index. The _routing field is not setup on
this index and we aren't using any aliases.

Do your docs use the parent field?

Because that implies routing

Also, why set indexing to async - you can overwhelm ES if you try to
index more than your nodes to handle. If you leave it as sync, then
you'll get automatic rate limiting. You can run multiple processes in
parallel, but ES will limit the overall rate to what it can handle.

Also, David's comment about indexing every 100 - 1000 docs instead of
after 17,500 is a good one. Indexing that number of docs can use up
lots of memory and slow things down

clint

The code we are using for bulk loading the documents is:

private void saveSpecRecords(final String index, final

List records) throws Exception {
Client client = null;
BulkRequestBuilder bulkRequest = null;
long size = 0l;

    client = getClient();

    bulkRequest = client.prepareBulk();
    bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE);
    bulkRequest.setRefresh(DEFAULT_REFRESH);
    bulkRequest.setReplicationType(ReplicationType.ASYNC);

    bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE);
    bulkRequest.setReplicationType(ReplicationType.ASYNC);
    bulkRequest.setRefresh(false);

    for (Record record : records) {
        Map<String, Object> source_fields = record.getSource();
        IndexRequestBuilder doc = client.prepareIndex(index,

DEFAULT_DOC_TYPE, record.getId());
doc.setConsistencyLevel(WriteConsistencyLevel.ONE);
doc.setRefresh(false); // DEFAULT_REFRESH
//doc.setCreate(true); // DEFAULT_CREATE
doc.setReplicationType(ReplicationType.ASYNC);

        // Remove the id field from the source and set the source
        source_fields.remove(XmlReader.ID_FIELD);
        doc.setSource(source_fields);            
        
        logger.debug("record id: " + record.getId());
        //logger.debug("source keys: " + source_fields.keySet());
        
        // Add the size up to average it out later
        size += record.getJsonString().length();            

        bulkRequest.add(doc);
    }
    
    logger.debug("record source size avg (Kbs): " + (size /

records.size() / 1024));

    ListenableActionFuture<BulkResponse> lbr =

bulkRequest.execute();
logger.debug("start actionGet() for BulkResponse");
BulkResponse response = lbr.actionGet();
logger.debug("end actionGet() for BulkResponse");
if (response.hasFailures()) {
logger.error("found the following errors while processing
a bulk insert");
for (BulkItemResponse item : response.items()) {
if (item.failed()) {
logger.debug(item.getIndex());
logger.debug(item.getId());
logger.error(item.failureMessage());
}
}
throw new Exception("error bulk copying data into spec
index " + index);
}
}

What am I missing? Please help if you have an idea of why all of the
documents end up on 1 shard.

--

--


(amos.wood) #5

I suggest that you execute the bulk request every 100 documents instead of
after 17k.

The 17Kb number is the size of the document and not the count. I have
tried executing the bulk request with 100 and 1000 and everything between
with no success.

--


(amos.wood) #6

On Thursday, August 23, 2012 8:40:26 AM UTC-5, Clinton Gormley wrote:

Do your docs use the parent field?
Because that implies routing

No.

--


(amos.wood) #7

Do you think that this issue has to do with routing?
If so, does the _routing field have to be "enabled" to use the document id
as the default?

By default, doesn't the routing of the document use a hash of document id
field?
If so, does it use the "_id" value from the IndexRequestBuilder:setId()
function for routing to the appropriate shard in the index

Another oddity is that when not bulk loading the documents and instead just
INDEXing every document, all of them still went to shard 3 of the index
which implies that it is not a bulk loading issue, right?

On Thursday, August 23, 2012 8:15:05 AM UTC-5, amos.wood wrote:

We have an index named "partylogins" which we are trying to bulk load
using the Java API. The index has 4 shards and no replicas during our bulk
load process. However, all documents being indexed are being saved in
shard 3 of the index. The _routing field is not setup on this index and we
aren't using any aliases.

The code we are using for bulk loading the documents is:

private void saveSpecRecords(final String index, final List<Record>records

) throws Exception {
Client client = null;
BulkRequestBuilder bulkRequest = null;
long size = 0l;

    client = getClient();

    bulkRequest = client.prepareBulk();
    bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE);
    bulkRequest.setRefresh(DEFAULT_REFRESH);
    bulkRequest.setReplicationType(ReplicationType.ASYNC);

    bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE);
    bulkRequest.setReplicationType(ReplicationType.ASYNC);
    bulkRequest.setRefresh(false);

    for (Record record : records) {
        Map<String, Object> source_fields = record.getSource();
        IndexRequestBuilder doc = client.prepareIndex(index,DEFAULT_DOC_TYPE

, record.getId());
doc.setConsistencyLevel(WriteConsistencyLevel.ONE);
doc.setRefresh(false); // DEFAULT_REFRESH
//doc.setCreate(true); // DEFAULT_CREATE
doc.setReplicationType(ReplicationType.ASYNC);

        // Remove the id field from the source and set the source
        source_fields.remove(XmlReader.ID_FIELD);
        doc.setSource(source_fields);            
        
        logger.debug("record id: " + record.getId());
        //logger.debug("source keys: " + source_fields.keySet());
        
        // Add the size up to average it out later
        size += record.getJsonString().length();            

        bulkRequest.add(doc);
    }
    
    logger.debug("record source size avg (Kbs): " + (size / records.

size() / 1024));

    ListenableActionFuture<BulkResponse> lbr = bulkRequest.execute();
    logger.debug("start actionGet() for BulkResponse");
    BulkResponse response = lbr.actionGet();
    logger.debug("end actionGet() for BulkResponse");
    if (response.hasFailures()) {
        logger.error("found the following errors while processing a 

bulk insert");
for (BulkItemResponse item : response.items()) {
if (item.failed()) {
logger.debug(item.getIndex());
logger.debug(item.getId());
logger.error(item.failureMessage());
}
}
throw new Exception("error bulk copying data into spec index "

  • index);
    }
    }

What am I missing? Please help if you have an idea of why all of the
documents end up on 1 shard.

--


(Clinton Gormley) #8

On Thu, 2012-08-23 at 06:53 -0700, amos.wood wrote:

Do you think that this issue has to do with routing?

Looks like it

If so, does the _routing field have to be "enabled" to use the
document id as the default?

No

By default, doesn't the routing of the document use a hash of document
id field?

Yes

If so, does it use the "_id" value from the
IndexRequestBuilder:setId() function for routing to the appropriate
shard in the index

A hash thereof, yes

So one possibility is that all of your _ids are hashing to the same
value (which seems unlikely)

I'd suggest trying to break the problem down into a smaller version
(especially as you know that there is a problem with what you are doing
with single doc indexing, rather than bulk)

Gist a curl recreation of the entire problem (from index creation
onwards), then the issue will quickly become apparent. (it may also
become apparent to you before you finish the gist)

clint

Another oddity is that when not bulk loading the documents and instead
just INDEXing every document, all of them still went to shard 3 of the
index which implies that it is not a bulk loading issue, right?

On Thursday, August 23, 2012 8:15:05 AM UTC-5, amos.wood wrote:
We have an index named "partylogins" which we are trying to
bulk load using the Java API. The index has 4 shards and no
replicas during our bulk load process. However, all documents
being indexed are being saved in shard 3 of the index. The
_routing field is not setup on this index and we aren't using
any aliases.

    The code we are using for bulk loading the documents is:
    
        private void saveSpecRecords(final String index, final
    List<Record> records) throws Exception {
            Client client = null;
            BulkRequestBuilder bulkRequest = null;
            long size = 0l;        
    
            client = getClient();
    
            bulkRequest = client.prepareBulk();
    
    bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE);
            bulkRequest.setRefresh(DEFAULT_REFRESH);
            bulkRequest.setReplicationType(ReplicationType.ASYNC);
    
    
    bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE);
            bulkRequest.setReplicationType(ReplicationType.ASYNC);
            bulkRequest.setRefresh(false);
    
            for (Record record : records) {
                Map<String, Object> source_fields =
    record.getSource();
                IndexRequestBuilder doc =
    client.prepareIndex(index, DEFAULT_DOC_TYPE, record.getId());
    
    doc.setConsistencyLevel(WriteConsistencyLevel.ONE);
                doc.setRefresh(false); // DEFAULT_REFRESH
                //doc.setCreate(true); // DEFAULT_CREATE
                doc.setReplicationType(ReplicationType.ASYNC);
                
                // Remove the id field from the source and set the
    source
                source_fields.remove(XmlReader.ID_FIELD);
                doc.setSource(source_fields);            
                
                logger.debug("record id: " + record.getId());
                //logger.debug("source keys: " +
    source_fields.keySet());
                
                // Add the size up to average it out later
                size += record.getJsonString().length();
       
    
                bulkRequest.add(doc);
            }
            
            logger.debug("record source size avg (Kbs): " +
    (size / records.size() / 1024));
            
            ListenableActionFuture<BulkResponse> lbr =
    bulkRequest.execute();
            logger.debug("start actionGet() for BulkResponse");
            BulkResponse response = lbr.actionGet();
            logger.debug("end actionGet() for BulkResponse");
            if (response.hasFailures()) {
                logger.error("found the following errors while
    processing a bulk insert");
                for (BulkItemResponse item : response.items()) {
                    if (item.failed()) {
                        logger.debug(item.getIndex());
                        logger.debug(item.getId());
                        logger.error(item.failureMessage());
                    }
                }
                throw new Exception("error bulk copying data into
    spec index " + index);
            }
        }
    
    
    
    
    What am I missing?  Please help if you have an idea of why all
    of the documents end up on 1 shard.

--

--


(David Pilato) #9

it may also become apparent to you before you finish the gist
+1000

I hate that. Often, I started to create a nice Gist to expose my issues with full explanation and comments and the solution came by the question itself! :wink: -> just end with throwing the email in the trashcan ! :wink:

--
David :wink:
Twitter : @dadoonet / @elasticsearchfr / @scrutmydocs

Le 23 août 2012 à 16:01, Clinton Gormley clint@traveljury.com a écrit :

On Thu, 2012-08-23 at 06:53 -0700, amos.wood wrote:

Do you think that this issue has to do with routing?

Looks like it

If so, does the _routing field have to be "enabled" to use the
document id as the default?

No

By default, doesn't the routing of the document use a hash of document
id field?

Yes

If so, does it use the "_id" value from the
IndexRequestBuilder:setId() function for routing to the appropriate
shard in the index

A hash thereof, yes

So one possibility is that all of your _ids are hashing to the same
value (which seems unlikely)

I'd suggest trying to break the problem down into a smaller version
(especially as you know that there is a problem with what you are doing
with single doc indexing, rather than bulk)

Gist a curl recreation of the entire problem (from index creation
onwards), then the issue will quickly become apparent. (it may also
become apparent to you before you finish the gist)

clint

Another oddity is that when not bulk loading the documents and instead
just INDEXing every document, all of them still went to shard 3 of the
index which implies that it is not a bulk loading issue, right?

On Thursday, August 23, 2012 8:15:05 AM UTC-5, amos.wood wrote:
We have an index named "partylogins" which we are trying to
bulk load using the Java API. The index has 4 shards and no
replicas during our bulk load process. However, all documents
being indexed are being saved in shard 3 of the index. The
_routing field is not setup on this index and we aren't using
any aliases.

   The code we are using for bulk loading the documents is:

       private void saveSpecRecords(final String index, final
   List<Record> records) throws Exception {
           Client client = null;
           BulkRequestBuilder bulkRequest = null;
           long size = 0l;        

           client = getClient();

           bulkRequest = client.prepareBulk();

   bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE);
           bulkRequest.setRefresh(DEFAULT_REFRESH);
           bulkRequest.setReplicationType(ReplicationType.ASYNC);


   bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE);
           bulkRequest.setReplicationType(ReplicationType.ASYNC);
           bulkRequest.setRefresh(false);

           for (Record record : records) {
               Map<String, Object> source_fields =
   record.getSource();
               IndexRequestBuilder doc =
   client.prepareIndex(index, DEFAULT_DOC_TYPE, record.getId());

   doc.setConsistencyLevel(WriteConsistencyLevel.ONE);
               doc.setRefresh(false); // DEFAULT_REFRESH
               //doc.setCreate(true); // DEFAULT_CREATE
               doc.setReplicationType(ReplicationType.ASYNC);

               // Remove the id field from the source and set the
   source
               source_fields.remove(XmlReader.ID_FIELD);
               doc.setSource(source_fields);            

               logger.debug("record id: " + record.getId());
               //logger.debug("source keys: " +
   source_fields.keySet());

               // Add the size up to average it out later
               size += record.getJsonString().length();


               bulkRequest.add(doc);
           }

           logger.debug("record source size avg (Kbs): " +
   (size / records.size() / 1024));

           ListenableActionFuture<BulkResponse> lbr =
   bulkRequest.execute();
           logger.debug("start actionGet() for BulkResponse");
           BulkResponse response = lbr.actionGet();
           logger.debug("end actionGet() for BulkResponse");
           if (response.hasFailures()) {
               logger.error("found the following errors while
   processing a bulk insert");
               for (BulkItemResponse item : response.items()) {
                   if (item.failed()) {
                       logger.debug(item.getIndex());
                       logger.debug(item.getId());
                       logger.error(item.failureMessage());
                   }
               }
               throw new Exception("error bulk copying data into
   spec index " + index);
           }
       }




   What am I missing?  Please help if you have an idea of why all
   of the documents end up on 1 shard.

--

--

--


(Wes Plunk) #10

That would be nice, we're working on the gist now. Will provide results
here either way

On Thursday, August 23, 2012 9:20:43 AM UTC-5, David Pilato wrote:

it may also become apparent to you before you finish the gist
+1000

I hate that. Often, I started to create a nice Gist to expose my issues
with full explanation and comments and the solution came by the question
itself! :wink: -> just end with throwing the email in the trashcan ! :wink:

--
David :wink:
Twitter : @dadoonet / @elasticsearchfr / @scrutmydocs

Le 23 août 2012 à 16:01, Clinton Gormley <cl...@traveljury.com<javascript:>>
a écrit :

On Thu, 2012-08-23 at 06:53 -0700, amos.wood wrote:

Do you think that this issue has to do with routing?

Looks like it

If so, does the _routing field have to be "enabled" to use the
document id as the default?

No

By default, doesn't the routing of the document use a hash of document
id field?

Yes

If so, does it use the "_id" value from the
IndexRequestBuilder:setId() function for routing to the appropriate
shard in the index

A hash thereof, yes

So one possibility is that all of your _ids are hashing to the same
value (which seems unlikely)

I'd suggest trying to break the problem down into a smaller version
(especially as you know that there is a problem with what you are doing
with single doc indexing, rather than bulk)

Gist a curl recreation of the entire problem (from index creation
onwards), then the issue will quickly become apparent. (it may also
become apparent to you before you finish the gist)

clint

Another oddity is that when not bulk loading the documents and instead
just INDEXing every document, all of them still went to shard 3 of the
index which implies that it is not a bulk loading issue, right?

On Thursday, August 23, 2012 8:15:05 AM UTC-5, amos.wood wrote:
We have an index named "partylogins" which we are trying to
bulk load using the Java API. The index has 4 shards and no
replicas during our bulk load process. However, all documents
being indexed are being saved in shard 3 of the index. The
_routing field is not setup on this index and we aren't using
any aliases.

   The code we are using for bulk loading the documents is: 

       private void saveSpecRecords(final String index, final 
   List<Record> records) throws Exception { 
           Client client = null; 
           BulkRequestBuilder bulkRequest = null; 
           long size = 0l;         

           client = getClient(); 

           bulkRequest = client.prepareBulk(); 

   bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE); 
           bulkRequest.setRefresh(DEFAULT_REFRESH); 
           bulkRequest.setReplicationType(ReplicationType.ASYNC); 


   bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE); 
           bulkRequest.setReplicationType(ReplicationType.ASYNC); 
           bulkRequest.setRefresh(false); 

           for (Record record : records) { 
               Map<String, Object> source_fields = 
   record.getSource(); 
               IndexRequestBuilder doc = 
   client.prepareIndex(index, DEFAULT_DOC_TYPE, record.getId()); 

   doc.setConsistencyLevel(WriteConsistencyLevel.ONE); 
               doc.setRefresh(false); // DEFAULT_REFRESH 
               //doc.setCreate(true); // DEFAULT_CREATE 
               doc.setReplicationType(ReplicationType.ASYNC); 

               // Remove the id field from the source and set the 
   source 
               source_fields.remove(XmlReader.ID_FIELD); 
               doc.setSource(source_fields);             

               logger.debug("record id: " + record.getId()); 
               //logger.debug("source keys: " + 
   source_fields.keySet()); 

               // Add the size up to average it out later 
               size += record.getJsonString().length(); 


               bulkRequest.add(doc); 
           } 

           logger.debug("record source size avg (Kbs): " + 
   (size / records.size() / 1024)); 

           ListenableActionFuture<BulkResponse> lbr = 
   bulkRequest.execute(); 
           logger.debug("start actionGet() for BulkResponse"); 
           BulkResponse response = lbr.actionGet(); 
           logger.debug("end actionGet() for BulkResponse"); 
           if (response.hasFailures()) { 
               logger.error("found the following errors while 
   processing a bulk insert"); 
               for (BulkItemResponse item : response.items()) { 
                   if (item.failed()) { 
                       logger.debug(item.getIndex()); 
                       logger.debug(item.getId()); 
                       logger.error(item.failureMessage()); 
                   } 
               } 
               throw new Exception("error bulk copying data into 
   spec index " + index); 
           } 
       } 




   What am I missing?  Please help if you have an idea of why all 
   of the documents end up on 1 shard. 

--

--

--


(Wes Plunk) #11

That would be nice, we're working on the gist now. Will provide results
here either way

--


(Wes Plunk) #12

Here's the gist: https://gist.github.com/3439162

If you need more data please just let us know. Our conclusion from this
example is that using numeric ids vs guids is causing the problem. Thoughts
as to why? Does this run and perform the same on your system?

We have 2 nodes on one cluster and 4 on another and this example produced
the same results on both clusters

Our definition of failure: It fails when all records are written to the
same shard regardless of how the shards are allocated to the nodes

--


(David Pilato) #13

How did you generate guids?

The way I understood shard repartition is that everything is based on internal guid computed for each id. Could it be a coincidence that you generated only guids that will be routed to the same shard?

--
David :wink:
Twitter : @dadoonet / @elasticsearchfr / @scrutmydocs

Le 23 août 2012 à 19:50, Wes Plunk wes@wesandemily.com a écrit :

Here's the gist: https://gist.github.com/3439162

If you need more data please just let us know. Our conclusion from this example is that using numeric ids vs guids is causing the problem. Thoughts as to why? Does this run and perform the same on your system?

We have 2 nodes on one cluster and 4 on another and this example produced the same results on both clusters

Our definition of failure: It fails when all records are written to the same shard regardless of how the shards are allocated to the nodes

--


(Clinton Gormley) #14

Hi Wes

On Thu, 2012-08-23 at 10:50 -0700, Wes Plunk wrote:

Here's the gist: https://gist.github.com/3439162

If you need more data please just let us know. Our conclusion from
this example is that using numeric ids vs guids is causing the
problem. Thoughts as to why? Does this run and perform the same on
your system?

This does exactly as you describe on our system. I also tried changing
a few IDs (adding a character) and then it spread them onto more nodes.

So bizarrely, it seems that almost all your IDs are being hashed to the
same shard, which is either an amazing coincidence or a bug, I'd say.

Please open an issue linking to this gist

ta

clint

We have 2 nodes on one cluster and 4 on another and this example
produced the same results on both clusters

Our definition of failure: It fails when all records are written to
the same shard regardless of how the shards are allocated to the nodes

--

--


(Drew Raines) #15

Clinton Gormley wrote:

On Thu, 2012-08-23 at 10:50 -0700, Wes Plunk wrote:

Here's the gist: https://gist.github.com/3439162

If you need more data please just let us know. Our conclusion from
this example is that using numeric ids vs guids is causing the
problem. Thoughts as to why? Does this run and perform the same on
your system?

This does exactly as you describe on our system. I also tried
changing a few IDs (adding a character) and then it spread them
onto more nodes.

So bizarrely, it seems that almost all your IDs are being hashed to
the same shard, which is either an amazing coincidence or a bug,
I'd say.

Interesting!

The hash calculation (based on DjbHashFunction) does produce
different ints for each uuid:

http://p.draines.com/13457538251189dc8c3cc.txt

However, for this case, it happens to produce ints that are always
divisible by 4. This causes the modulo calculation in
PlainOperationRouting.shardId() to always return the same number (in
my case it's 0).

Math.abs(h % n)

I've played around with different scenarios and it's kind of
hit-or-miss. Different 36-byte ids with n that's a multiple of 2
doesn't get a good random distribution. For example, here's n == 32
(taking only 40 ids):

(4 4 0 20 4 12 20 28 8 0 8 0 28 12 20 0 0 28 12 24 28 12 0 4 28 28 4 28 12 24 28 12 4 16 4 20 12 24 8 24)

But n == 33 is:

(21 21 21 1 11 1 11 15 29 25 2 2 30 21 14 10 12 10 31 12 8 20 19 4 18 21 18 7 7 10 28 14 32 27 13 28 10 31 11 24)

...which is a little better.

I'm no crypto expert but it may be that this is a "limitation" of the
djb hash function. I'm not sure what the right fix is for ES. A
provision might need to be put in place for this exact scenario where
it appends some random bytes to the end of the id to avoid this
particular bug.

From a practical standpoint, Wes & Amos, you guys can Base64 your id
into a 48-byte string like this which will distribute more evenly
(using Base64.encodeBytes(id.getBytes())):

(1 2 3 2 2 0 3 3 1 1 0 3 0 2 0 3 3 3 3 3 3 0 0 3 2 3 0 3 3 2 2 0 1 1 0 1 2 2 1 1)

If your UUIDs aren't meaningful apart from this index, then you can
use an ES-generated 22-byte UUID.randomBase64UUID to save disk and
ram. 40 of these with 4 shards yields this distribution:

(3 0 0 3 0 1 3 1 2 1 0 3 1 2 2 2 0 1 1 0 2 2 3 1 3 0 2 3 2 0 0 0 1 3 3 3 2 0 1 0)

-Drew

--


(amos.wood) #16

Drew -- Thanks for the info about the djb hash function.

In our case, the ID is meaningful because it is how that get/update the
document and how that the documents are joined together.

Would it be possible to solve our case by changing the "n" in the equation
from 4 shards to something like 5 or 7? Would that tend to balance things
better?

--


(Drew Raines) #17

amos.wood wrote:

Drew -- Thanks for the info about the djb hash function.

In our case, the ID is meaningful because it is how that get/update
the document and how that the documents are joined together.

Would it be possible to solve our case by changing the "n" in the
equation from 4 shards to something like 5 or 7? Would that tend to
balance things better?

Yes! Forgot to mention that. Changing the shard count will also
better distribute documents. An odd number, like you say 5 or 7,
would work best.

-Drew

--


(Ivan Brusic) #18

Very interesting read!

Pluggable hash functions would be interesting (and dangerous at the same time).

--
Ivan

On Thu, Aug 23, 2012 at 2:54 PM, Drew Raines aaraines@gmail.com wrote:

Clinton Gormley wrote:

On Thu, 2012-08-23 at 10:50 -0700, Wes Plunk wrote:

Here's the gist: https://gist.github.com/3439162

If you need more data please just let us know. Our conclusion from
this example is that using numeric ids vs guids is causing the
problem. Thoughts as to why? Does this run and perform the same on
your system?

This does exactly as you describe on our system. I also tried
changing a few IDs (adding a character) and then it spread them
onto more nodes.

So bizarrely, it seems that almost all your IDs are being hashed to
the same shard, which is either an amazing coincidence or a bug,
I'd say.

Interesting!

The hash calculation (based on DjbHashFunction) does produce
different ints for each uuid:

http://p.draines.com/13457538251189dc8c3cc.txt

However, for this case, it happens to produce ints that are always
divisible by 4. This causes the modulo calculation in
PlainOperationRouting.shardId() to always return the same number (in
my case it's 0).

Math.abs(h % n)

I've played around with different scenarios and it's kind of
hit-or-miss. Different 36-byte ids with n that's a multiple of 2
doesn't get a good random distribution. For example, here's n == 32
(taking only 40 ids):

(4 4 0 20 4 12 20 28 8 0 8 0 28 12 20 0 0 28 12 24 28 12 0 4 28 28 4 28 12 24 28 12 4 16 4 20 12 24 8 24)

But n == 33 is:

(21 21 21 1 11 1 11 15 29 25 2 2 30 21 14 10 12 10 31 12 8 20 19 4 18 21 18 7 7 10 28 14 32 27 13 28 10 31 11 24)

...which is a little better.

I'm no crypto expert but it may be that this is a "limitation" of the
djb hash function. I'm not sure what the right fix is for ES. A
provision might need to be put in place for this exact scenario where
it appends some random bytes to the end of the id to avoid this
particular bug.

From a practical standpoint, Wes & Amos, you guys can Base64 your id
into a 48-byte string like this which will distribute more evenly
(using Base64.encodeBytes(id.getBytes())):

(1 2 3 2 2 0 3 3 1 1 0 3 0 2 0 3 3 3 3 3 3 0 0 3 2 3 0 3 3 2 2 0 1 1 0 1 2 2 1 1)

If your UUIDs aren't meaningful apart from this index, then you can
use an ES-generated 22-byte UUID.randomBase64UUID to save disk and
ram. 40 of these with 4 shards yields this distribution:

(3 0 0 3 0 1 3 1 2 1 0 3 1 2 2 2 0 1 1 0 2 2 3 1 3 0 2 3 2 0 0 0 1 3 3 3 2 0 1 0)

-Drew

--

--


(system) #19