BulkProcessor usage is safe?

Hi,

I'm trying to implement a solution for fast bulk insert of data into ES.
I'm currently noticing that it is the fastest way of inserting data. I have
also tried to implement it through a multi threaded approach with
BulkRequestBuilder but I wasn't able to reach the throughput of
BulkProcessor.

My problem is that at some point I receive exceptions in Listener like no
node is available
and the most important is that if I stress it much i see
more docs than the ones inserted (e.g. i insert 500000 docs and i finally
see something like 518018 docs)

Your help will be much appreciated.

I create a processor as follows:

----- CODE ------
processor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
public void beforeBulk(long executionId, BulkRequest
request) {
log.debug(String.format("Execution: %s, about execute
new bulk insert composed of {%s} actions", executionId,
request.numberOfActions()));
}

            public void afterBulk(long executionId, BulkRequest 

request, BulkResponse response) {
log.debug(String.format("Execution: %s, bulk insert
composed of {%s} actions, took %s", executionId,
request.numberOfActions(),
formatUtils.getDurationDHMS(response.getTookInMillis())));
}

            public void afterBulk(long executionId, BulkRequest 

request, Throwable failure) {
// throw new RuntimeException(String.format("Execution
with id %s failed", executionId), failure);
}
}).setConcurrentRequests(500).build();
----- CODE ------

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Examine your code (which I can't find in your post) where you add items
to BulkRequest. This procedure must be thread safe. Next, check your doc
ID creation if it is really unique.

Note, if your code snippet is really true, 500 concurrent requests will
massively slow down your bulk ingestion and add a significant overhead
to thread management (and, by default, the ES pools are not this large
that you can really expect this to work efficiently).

"No node available" is no surprise under this circumstances, it is an
exception if you "overwhelm" ES and it can't respond within 5 seconds.
You need to streamline your bulk ingest - beside less concurrency, check
how much throughput your node heap & disk i/o can process, and adjust
the client bulk max actions and max concurrency.

Jörg

Am 02.07.13 21:04, schrieb Thomas Bolis:

Hi,

I'm trying to implement a solution for fast bulk insert of data into
ES. I'm currently noticing that it is the fastest way of inserting
data. I have also tried to implement it through a multi threaded
approach with BulkRequestBuilder but I wasn't able to reach the
throughput of BulkProcessor.

My problem is that at some point I receive exceptions in Listener like
/no node is available/ and the most important is that if I stress it
much i see more docs than the ones inserted (e.g. i insert 500000 docs
and i finally see something like 518018 docs)

Your help will be much appreciated.

I create a processor as follows:

----- CODE ------
processor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
public void beforeBulk(long executionId, BulkRequest
request) {
log.debug(String.format("Execution: %s, about
execute new bulk insert composed of {%s} actions", executionId,
request.numberOfActions()));
}

            public void afterBulk(long executionId, BulkRequest 

request, BulkResponse response) {
log.debug(String.format("Execution: %s, bulk
insert composed of {%s} actions, took %s", executionId,
request.numberOfActions(),
formatUtils.getDurationDHMS(response.getTookInMillis())));
}

            public void afterBulk(long executionId, BulkRequest 

request, Throwable failure) {
// throw new
RuntimeException(String.format("Execution with id %s failed",
executionId), failure);
}
}).setConcurrentRequests(500).build();
----- CODE ------

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Thomas,

See https://groups.google.com/d/msg/elasticsearch/jUoI1xGAVio/dnY11_VUL4AJ
for my post to someone else about how to use the BulkRequestBuilder. It's
not fully filled-out code, but it is nice high-level pseudocode that should
help a lot.

One other thing: Elasticsearch threading, especially in 0.90 and above,
does such a superb job that my bulk loader is serial (one thread). Besides,
my customer's updates contain a mix of deletes and inserts, and they must
be processed in a specific order. For example, if a delete for A is
followed by an insert for A, it would be a very bad thing to multithread
this and have one thread issue the insert for A before another thread
issued the delete for A. So what my customer gives me is also the one of
the best ways to use Elasticsearch.

Hope this helps!

Brian

On Tuesday, July 2, 2013 3:04:15 PM UTC-4, Thomas Bolis wrote:

Hi,

I'm trying to implement a solution for fast bulk insert of data into ES.
I'm currently noticing that it is the fastest way of inserting data. I have
also tried to implement it through a multi threaded approach with
BulkRequestBuilder but I wasn't able to reach the throughput of
BulkProcessor.

My problem is that at some point I receive exceptions in Listener like no
node is available
and the most important is that if I stress it much i
see more docs than the ones inserted (e.g. i insert 500000 docs and i
finally see something like 518018 docs)

Your help will be much appreciated.

I create a processor as follows:

----- CODE ------
processor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
public void beforeBulk(long executionId, BulkRequest
request) {
log.debug(String.format("Execution: %s, about execute
new bulk insert composed of {%s} actions", executionId,
request.numberOfActions()));
}

            public void afterBulk(long executionId, BulkRequest 

request, BulkResponse response) {
log.debug(String.format("Execution: %s, bulk insert
composed of {%s} actions, took %s", executionId,
request.numberOfActions(),
formatUtils.getDurationDHMS(response.getTookInMillis())));
}

            public void afterBulk(long executionId, BulkRequest 

request, Throwable failure) {
// throw new RuntimeException(String.format("Execution
with id %s failed", executionId), failure);
}
}).setConcurrentRequests(500).build();
----- CODE ------

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

First of all Thank you all for your reply :),

Dear Jorg, I will try it with less concurrent requests, but the thing is
that performance will drop probably I will go to my other implementation
which is through BulkRequestBuilder which I don't have these issues.

My question is though whether BulkProcessor is a class meant to be used
generally or it is an internal class for elasticsearch engine an I probably
do not use it well..

Dear Brian, between the implementation proposed and mine there is one
difference the bulkRequest.setRefresh(false). But i check the code and it
is false by default. Probably because i use 0.90.2 and maybe on version
0.20 wasn't the case.

I will try again your proposals and let you know.

Thanks again

Thomas

On Wed, Jul 3, 2013 at 3:20 AM, InquiringMind brian.from.fl@gmail.comwrote:

Thomas,

See https://groups.google.com/d/msg/elasticsearch/jUoI1xGAVio/dnY11_VUL4AJfor my post to someone else about how to use the BulkRequestBuilder. It's
not fully filled-out code, but it is nice high-level pseudocode that should
help a lot.

One other thing: Elasticsearch threading, especially in 0.90 and above,
does such a superb job that my bulk loader is serial (one thread). Besides,
my customer's updates contain a mix of deletes and inserts, and they must
be processed in a specific order. For example, if a delete for A is
followed by an insert for A, it would be a very bad thing to multithread
this and have one thread issue the insert for A before another thread
issued the delete for A. So what my customer gives me is also the one of
the best ways to use Elasticsearch.

Hope this helps!

Brian

On Tuesday, July 2, 2013 3:04:15 PM UTC-4, Thomas Bolis wrote:

Hi,

I'm trying to implement a solution for fast bulk insert of data into ES.
I'm currently noticing that it is the fastest way of inserting data. I have
also tried to implement it through a multi threaded approach with
BulkRequestBuilder but I wasn't able to reach the throughput of
BulkProcessor.

My problem is that at some point I receive exceptions in Listener like no
node is available
and the most important is that if I stress it much i
see more docs than the ones inserted (e.g. i insert 500000 docs and i
finally see something like 518018 docs)

Your help will be much appreciated.

I create a processor as follows:

----- CODE ------
processor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
public void beforeBulk(long executionId, BulkRequest
request) {
log.debug(String.format("**Execution: %s, about
execute new bulk insert composed of {%s} actions", executionId,
**
request.numberOfActions()));
}

            public void afterBulk(long executionId, BulkRequest

request, BulkResponse response) {
log.debug(String.format("**Execution: %s, bulk
insert composed of {%s} actions, took %s", executionId,
** request.numberOfActions(),
formatUtils.getDurationDHMS(**response.getTookInMillis())));
}

            public void afterBulk(long executionId, BulkRequest

request, Throwable failure) {
// throw new RuntimeException(String.**format("Execution
with id %s failed", executionId), failure);
}
}).setConcurrentRequests(500).**build();
----- CODE ------

--
You received this message because you are subscribed to a topic in the
Google Groups "elasticsearch" group.
To unsubscribe from this topic, visit
https://groups.google.com/d/topic/elasticsearch/vUtJKbjb1bo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to
elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Note that ES thread pools are aligned with the number of CPU cores you
have. A higher number of threads will not result in higher performance.
So, if you must really use 500 concurrent threads for best bulk
performance, you have either the world largest CPU core machine, or your
code have some other fundamental problems. You should check if you can
process 8-10 MB bulk data per second per node (for example, monitor the
network connection).

The BulkProcessor class is not marked for internal use (as no ES class
is). I think as long as it is not marked "@deprecated" it will stay in
ES API (if not I could provide substitution class as a plugin because
BulkProcessor does no magic, but I think this will not be necessary)
Remember, the people at ES are really helpful and supportive and high
performance bulk indexing is a strong requirement for many of us in the
community.

Jörg

Am 03.07.13 09:14, schrieb Thomas Bolis:

Dear Jorg, I will try it with less concurrent requests, but the thing
is that performance will drop probably I will go to my other
implementation which is through BulkRequestBuilder which I don't have
these issues.

My question is though whether BulkProcessor is a class meant to be
used generally or it is an internal class for elasticsearch engine an
I probably do not use it well..

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Dear Jörg,

Thank you very much for your reply, finally I went with the implementation
of BulkRequestBuilder. The 500 concurrent request is of course a stress I
provide to our cluster, i ended up with bigger queues though. We will now
play a bit with more CPU/Memory. Although I think that CPU is the key to
instantly see performance increase. Still new with Elasticsearch but
excited to dig more into it :slight_smile:

Thanks again

Thomas

On Wed, Jul 3, 2013 at 2:01 PM, Jörg Prante joergprante@gmail.com wrote:

Note that ES thread pools are aligned with the number of CPU cores you
have. A higher number of threads will not result in higher performance. So,
if you must really use 500 concurrent threads for best bulk performance,
you have either the world largest CPU core machine, or your code have some
other fundamental problems. You should check if you can process 8-10 MB
bulk data per second per node (for example, monitor the network connection).

The BulkProcessor class is not marked for internal use (as no ES class
is). I think as long as it is not marked "@deprecated" it will stay in ES
API (if not I could provide substitution class as a plugin because
BulkProcessor does no magic, but I think this will not be necessary)
Remember, the people at ES are really helpful and supportive and high
performance bulk indexing is a strong requirement for many of us in the
community.

Jörg

Am 03.07.13 09:14, schrieb Thomas Bolis:

Dear Jorg, I will try it with less concurrent requests, but the thing is
that performance will drop probably I will go to my other implementation
which is through BulkRequestBuilder which I don't have these issues.

My question is though whether BulkProcessor is a class meant to be used
generally or it is an internal class for elasticsearch engine an I probably
do not use it well..

--
You received this message because you are subscribed to a topic in the
Google Groups "elasticsearch" group.
To unsubscribe from this topic, visit https://groups.google.com/d/**
topic/elasticsearch/**vUtJKbjb1bo/unsubscribehttps://groups.google.com/d/topic/elasticsearch/vUtJKbjb1bo/unsubscribe
.
To unsubscribe from this group and all its topics, send an email to
elasticsearch+unsubscribe@**googlegroups.comelasticsearch%2Bunsubscribe@googlegroups.com
.
For more options, visit https://groups.google.com/**groups/opt_outhttps://groups.google.com/groups/opt_out
.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.