Bulk inserting is slow


(phoenix) #1

Hi everyone,

I'm inserting around 265 000 documents into an elastic search cluster
composed of 3 nodes (real servers).
On two servers i give elastic search 20g of heap, on third one which has
64g ram, i set 30g of heap for elastic search.

I set elastic search configuration to :

  • 3 shards (1 per server)
  • 0 replicas
  • discovery.zen.ping.multicast.enabled: false (and giving on each node the
    unicast hostnames of the two other nodes);
  • and this :

indices.memory.index_buffer_size: 50%
index.refresh_interval: 30s
threadpool:
index:
type: fixed
size: 30
queue_size: 1000
bulk:
queue_size: 1000
bulk:
type: fixed
size: 30
queue_size: 1000
search:
type: fixed
size: 100
queue_size: 200
get:
type: fixed
size: 100
queue_size: 200

Indexing is done by groups of 100 000 docs, and here is my application log :
INFO: Adding records to bulk insert batch
INFO: Added 100000 records to bulk insert batch. Inserting batch...
-- Bulk insert took 38.724 secondes
INFO: Adding records to bulk insert batch
INFO: Added 100000 records to bulk insert batch. Inserting batch...
-- Bulk insert took 31.134 secondes
INFO: Adding records to bulk insert batch
INFO: Added 64201 records to bulk insert batch. Inserting batch...
-- Bulk insert took 17.366 secondes

--- Import CSV file took 108.905 secondes ---

I'm wondering if this time is correct or not, or if there is something i
can do to improve performances ?

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


(Jörg Prante) #2

Your bulk insert size is too large. It makes no sense to insert 100.000
with one request. Use 1000-10000 instead.

Also you should submit bulk requests in parallel and not sequential like
you do. Sequential bulk is slow if client CPU/network is not saturated.

Check if you have disabled the index refresh from 1 (1s) to -1 while bulk
indexing is active. 30s makes not much sense if you can execute the bulk in
this time.

Do not limit indexing memory to 50%.

It makes no sense to increase queue_size for bulk thread pool to 1000. This
means you want a single ES node should accept 1000 x 100000 = 100 000 000 =
100m docs at once. This will simply exceeds all reasonable limits and bring
the node down with an OOM (if you really have 100m docs).

More advice is possible if you can show your client code how you push docs
to ES.

Jörg

On Mon, Jun 23, 2014 at 12:30 PM, Frederic Esnault <
esnault.frederic@gmail.com> wrote:

Hi everyone,

I'm inserting around 265 000 documents into an elastic search cluster
composed of 3 nodes (real servers).
On two servers i give elastic search 20g of heap, on third one which has
64g ram, i set 30g of heap for elastic search.

I set elastic search configuration to :

  • 3 shards (1 per server)
  • 0 replicas
  • discovery.zen.ping.multicast.enabled: false (and giving on each node the
    unicast hostnames of the two other nodes);
  • and this :

indices.memory.index_buffer_size: 50%
index.refresh_interval: 30s
threadpool:
index:
type: fixed
size: 30
queue_size: 1000
bulk:
queue_size: 1000
bulk:
type: fixed
size: 30
queue_size: 1000
search:
type: fixed
size: 100
queue_size: 200
get:
type: fixed
size: 100
queue_size: 200

Indexing is done by groups of 100 000 docs, and here is my application log
:
INFO: Adding records to bulk insert batch
INFO: Added 100000 records to bulk insert batch. Inserting batch...
-- Bulk insert took 38.724 secondes
INFO: Adding records to bulk insert batch
INFO: Added 100000 records to bulk insert batch. Inserting batch...
-- Bulk insert took 31.134 secondes
INFO: Adding records to bulk insert batch
INFO: Added 64201 records to bulk insert batch. Inserting batch...
-- Bulk insert took 17.366 secondes

--- Import CSV file took 108.905 secondes ---

I'm wondering if this time is correct or not, or if there is something i
can do to improve performances ?

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAKdsXoFWyXXyQSXhVbQ94VfxYvs50yQwS-Rg%3Dy51%2B%3Dwd9DT6Uw%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.


(Michael McCandless) #3

You'll actually get better indexing performance if you leave refresh
enabled, maybe at 5s. This is because ES a separate refresh thread which
will do the flushing, instead of having your bulk indexing threads to it
when RAM is full, effectively giving you one more thread of concurrency.

Mike McCandless

http://blog.mikemccandless.com

On Mon, Jun 23, 2014 at 6:56 AM, joergprante@gmail.com <
joergprante@gmail.com> wrote:

Your bulk insert size is too large. It makes no sense to insert 100.000
with one request. Use 1000-10000 instead.

Also you should submit bulk requests in parallel and not sequential like
you do. Sequential bulk is slow if client CPU/network is not saturated.

Check if you have disabled the index refresh from 1 (1s) to -1 while bulk
indexing is active. 30s makes not much sense if you can execute the bulk in
this time.

Do not limit indexing memory to 50%.

It makes no sense to increase queue_size for bulk thread pool to 1000.
This means you want a single ES node should accept 1000 x 100000 = 100 000
000 = 100m docs at once. This will simply exceeds all reasonable limits and
bring the node down with an OOM (if you really have 100m docs).

More advice is possible if you can show your client code how you push docs
to ES.

Jörg

On Mon, Jun 23, 2014 at 12:30 PM, Frederic Esnault <
esnault.frederic@gmail.com> wrote:

Hi everyone,

I'm inserting around 265 000 documents into an elastic search cluster
composed of 3 nodes (real servers).
On two servers i give elastic search 20g of heap, on third one which has
64g ram, i set 30g of heap for elastic search.

I set elastic search configuration to :

  • 3 shards (1 per server)
  • 0 replicas
  • discovery.zen.ping.multicast.enabled: false (and giving on each node
    the unicast hostnames of the two other nodes);
  • and this :

indices.memory.index_buffer_size: 50%
index.refresh_interval: 30s
threadpool:
index:
type: fixed
size: 30
queue_size: 1000
bulk:
queue_size: 1000
bulk:
type: fixed
size: 30
queue_size: 1000
search:
type: fixed
size: 100
queue_size: 200
get:
type: fixed
size: 100
queue_size: 200

Indexing is done by groups of 100 000 docs, and here is my application
log :
INFO: Adding records to bulk insert batch
INFO: Added 100000 records to bulk insert batch. Inserting batch...
-- Bulk insert took 38.724 secondes
INFO: Adding records to bulk insert batch
INFO: Added 100000 records to bulk insert batch. Inserting batch...
-- Bulk insert took 31.134 secondes
INFO: Adding records to bulk insert batch
INFO: Added 64201 records to bulk insert batch. Inserting batch...
-- Bulk insert took 17.366 secondes

--- Import CSV file took 108.905 secondes ---

I'm wondering if this time is correct or not, or if there is something i
can do to improve performances ?

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/CAKdsXoFWyXXyQSXhVbQ94VfxYvs50yQwS-Rg%3Dy51%2B%3Dwd9DT6Uw%40mail.gmail.com
https://groups.google.com/d/msgid/elasticsearch/CAKdsXoFWyXXyQSXhVbQ94VfxYvs50yQwS-Rg%3Dy51%2B%3Dwd9DT6Uw%40mail.gmail.com?utm_medium=email&utm_source=footer
.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAD7smRec8A%2BE4XDs2b-oVe-Ai%2BLyXXMr2BwNgB8LqnkK7MJXZA%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.


(phoenix) #4

Thanks for all this.

I changed my conf, removed all the thread pool config, reduced refresh time
to 5s according to Michael advice, and limited my batch to 10 000.
I'll see how it works then i'll paralellize the bulk insert.
I'll tell you how it ends up.

Thanks again !

Le lundi 23 juin 2014 12:56:14 UTC+2, Jörg Prante a écrit :

Your bulk insert size is too large. It makes no sense to insert 100.000
with one request. Use 1000-10000 instead.

Also you should submit bulk requests in parallel and not sequential like
you do. Sequential bulk is slow if client CPU/network is not saturated.

Check if you have disabled the index refresh from 1 (1s) to -1 while bulk
indexing is active. 30s makes not much sense if you can execute the bulk in
this time.

Do not limit indexing memory to 50%.

It makes no sense to increase queue_size for bulk thread pool to 1000.
This means you want a single ES node should accept 1000 x 100000 = 100 000
000 = 100m docs at once. This will simply exceeds all reasonable limits and
bring the node down with an OOM (if you really have 100m docs).

More advice is possible if you can show your client code how you push docs
to ES.

Jörg

On Mon, Jun 23, 2014 at 12:30 PM, Frederic Esnault <esnault....@gmail.com
<javascript:>> wrote:

Hi everyone,

I'm inserting around 265 000 documents into an elastic search cluster
composed of 3 nodes (real servers).
On two servers i give elastic search 20g of heap, on third one which has
64g ram, i set 30g of heap for elastic search.

I set elastic search configuration to :

  • 3 shards (1 per server)
  • 0 replicas
  • discovery.zen.ping.multicast.enabled: false (and giving on each node
    the unicast hostnames of the two other nodes);
  • and this :

indices.memory.index_buffer_size: 50%
index.refresh_interval: 30s
threadpool:
index:
type: fixed
size: 30
queue_size: 1000
bulk:
queue_size: 1000
bulk:
type: fixed
size: 30
queue_size: 1000
search:
type: fixed
size: 100
queue_size: 200
get:
type: fixed
size: 100
queue_size: 200

Indexing is done by groups of 100 000 docs, and here is my application
log :
INFO: Adding records to bulk insert batch
INFO: Added 100000 records to bulk insert batch. Inserting batch...
-- Bulk insert took 38.724 secondes
INFO: Adding records to bulk insert batch
INFO: Added 100000 records to bulk insert batch. Inserting batch...
-- Bulk insert took 31.134 secondes
INFO: Adding records to bulk insert batch
INFO: Added 64201 records to bulk insert batch. Inserting batch...
-- Bulk insert took 17.366 secondes

--- Import CSV file took 108.905 secondes ---

I'm wondering if this time is correct or not, or if there is something i
can do to improve performances ?

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearc...@googlegroups.com <javascript:>.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/26c24554-534a-4b41-997a-31da200efdd9%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


(phoenix) #5

Hi again,

any idea about how to parallelize the bulk insert process ?
I tried creating 4 BulkInserters extending RecursiveAction and executed
them all, but the result is awful, 3 of them finished very slowly, and one
did not finish (don't know why), and got only 70K docs in ES instead of 265
000...

The result of downsizing the batches sizes to 10 000 is not really big,
total process took approx. 1 second less (Actually this is much lower than
in the previous post, because i moved the importing UI to my server, close
to one of ES nodes). Was more than 29 seconds, now 28.
28 seconds.

Import CSV file took 28.069 secondes

Here is the insertion code. The Iterator is a CSV reading iterator who
parses lines and returns Record instances (object with generic object
values, indexed as string). MAX_RECORDS is my batch size, set to 10 000.

public void insert(Iterator<Record> recordsIterator) {
    while (recordsIterator.hasNext()) {
        batchInsert(recordsIterator, MAX_RECORDS);
    }
}

private void batchInsert(Iterator<Record> recordsIterator, int limit) {
    BulkRequestBuilder bulkRequest = client.prepareBulk();
    int processed = 0;
    try {
        logger.log(Level.INFO, "Adding records to bulk insert batch");
        while (recordsIterator.hasNext() && processed < limit) {
            processed++;
            Record record = recordsIterator.next();
            IndexRequestBuilder builder = 

client.prepareIndex(datasetName, RECORD);
XContentBuilder data = jsonBuilder();
data.startObject();
for (ColumnMetadata column :
dataset.getMetadata().getColumns()) {
Object value =
record.getCell(column.getName()).getValue();
if (value == null || (value instanceof String &&
value.equals("NULL"))) {
value = null;
}
data.field(column.getNormalizedName(), value);
}
data.endObject();
builder.setSource(data);
bulkRequest.add(builder);
}
logger.log(Level.INFO, "Added "+ bulkRequest.numberOfActions()
+" records to bulk insert batch. Inserting batch...");
long current = System.currentTimeMillis();
BulkResponse bulkResponse =
bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE).execute().actionGet();
if (bulkResponse.hasFailures()) {
logger.log(Level.SEVERE, "Could not index : " +
bulkResponse.buildFailureMessage());
}
System.out
.println(String.format("Bulk insert took %s secondes",
NumberUtils
.formatSeconds(((double)
(System.currentTimeMillis() - current)) / 1000.0)));
} catch (Exception e) {
e.printStackTrace();
}
}

Le mardi 24 juin 2014 13:44:03 UTC+2, Frederic Esnault a écrit :

Thanks for all this.

I changed my conf, removed all the thread pool config, reduced refresh
time to 5s according to Michael advice, and limited my batch to 10 000.
I'll see how it works then i'll paralellize the bulk insert.
I'll tell you how it ends up.

Thanks again !

Le lundi 23 juin 2014 12:56:14 UTC+2, Jörg Prante a écrit :

Your bulk insert size is too large. It makes no sense to insert 100.000
with one request. Use 1000-10000 instead.

Also you should submit bulk requests in parallel and not sequential like
you do. Sequential bulk is slow if client CPU/network is not saturated.

Check if you have disabled the index refresh from 1 (1s) to -1 while bulk
indexing is active. 30s makes not much sense if you can execute the bulk in
this time.

Do not limit indexing memory to 50%.

It makes no sense to increase queue_size for bulk thread pool to 1000.
This means you want a single ES node should accept 1000 x 100000 = 100 000
000 = 100m docs at once. This will simply exceeds all reasonable limits and
bring the node down with an OOM (if you really have 100m docs).

More advice is possible if you can show your client code how you push
docs to ES.

Jörg

On Mon, Jun 23, 2014 at 12:30 PM, Frederic Esnault <esnault....@gmail.com

wrote:

Hi everyone,

I'm inserting around 265 000 documents into an elastic search cluster
composed of 3 nodes (real servers).
On two servers i give elastic search 20g of heap, on third one which has
64g ram, i set 30g of heap for elastic search.

I set elastic search configuration to :

  • 3 shards (1 per server)
  • 0 replicas
  • discovery.zen.ping.multicast.enabled: false (and giving on each node
    the unicast hostnames of the two other nodes);
  • and this :

indices.memory.index_buffer_size: 50%
index.refresh_interval: 30s
threadpool:
index:
type: fixed
size: 30
queue_size: 1000
bulk:
queue_size: 1000
bulk:
type: fixed
size: 30
queue_size: 1000
search:
type: fixed
size: 100
queue_size: 200
get:
type: fixed
size: 100
queue_size: 200

Indexing is done by groups of 100 000 docs, and here is my application
log :
INFO: Adding records to bulk insert batch
INFO: Added 100000 records to bulk insert batch. Inserting batch...
-- Bulk insert took 38.724 secondes
INFO: Adding records to bulk insert batch
INFO: Added 100000 records to bulk insert batch. Inserting batch...
-- Bulk insert took 31.134 secondes
INFO: Adding records to bulk insert batch
INFO: Added 64201 records to bulk insert batch. Inserting batch...
-- Bulk insert took 17.366 secondes

--- Import CSV file took 108.905 secondes ---

I'm wondering if this time is correct or not, or if there is something i
can do to improve performances ?

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to elasticsearc...@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/32280fd5-5879-4424-882d-5b4e7674557a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


(Cédric Hourcade) #6

Hello,

You can use the BulkProcessor class to do the work for you:
https://github.com/elasticsearch/elasticsearch/blob/master/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java

Just configure/instantiate the class and .add() your index requests.
See: https://github.com/elasticsearch/elasticsearch/blob/master/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java

Cédric Hourcade
ced@wal.fr

On Tue, Jun 24, 2014 at 5:34 PM, Frederic Esnault
esnault.frederic@gmail.com wrote:

Hi again,

any idea about how to parallelize the bulk insert process ?
I tried creating 4 BulkInserters extending RecursiveAction and executed them
all, but the result is awful, 3 of them finished very slowly, and one did
not finish (don't know why), and got only 70K docs in ES instead of 265
000...

The result of downsizing the batches sizes to 10 000 is not really big,
total process took approx. 1 second less (Actually this is much lower than
in the previous post, because i moved the importing UI to my server, close
to one of ES nodes). Was more than 29 seconds, now 28.
28 seconds.

Import CSV file took 28.069 secondes

Here is the insertion code. The Iterator is a CSV reading iterator who
parses lines and returns Record instances (object with generic object
values, indexed as string). MAX_RECORDS is my batch size, set to 10 000.

public void insert(Iterator<Record> recordsIterator) {
    while (recordsIterator.hasNext()) {
        batchInsert(recordsIterator, MAX_RECORDS);
    }
}

private void batchInsert(Iterator<Record> recordsIterator, int limit) {
    BulkRequestBuilder bulkRequest = client.prepareBulk();
    int processed = 0;
    try {
        logger.log(Level.INFO, "Adding records to bulk insert batch");
        while (recordsIterator.hasNext() && processed < limit) {
            processed++;
            Record record = recordsIterator.next();
            IndexRequestBuilder builder =

client.prepareIndex(datasetName, RECORD);
XContentBuilder data = jsonBuilder();
data.startObject();
for (ColumnMetadata column :
dataset.getMetadata().getColumns()) {
Object value =
record.getCell(column.getName()).getValue();
if (value == null || (value instanceof String &&
value.equals("NULL"))) {
value = null;
}
data.field(column.getNormalizedName(), value);
}
data.endObject();
builder.setSource(data);
bulkRequest.add(builder);
}
logger.log(Level.INFO, "Added "+ bulkRequest.numberOfActions()
+" records to bulk insert batch. Inserting batch...");
long current = System.currentTimeMillis();
BulkResponse bulkResponse =
bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE).execute().actionGet();
if (bulkResponse.hasFailures()) {
logger.log(Level.SEVERE, "Could not index : " +
bulkResponse.buildFailureMessage());
}
System.out
.println(String.format("Bulk insert took %s secondes",
NumberUtils
.formatSeconds(((double)
(System.currentTimeMillis() - current)) / 1000.0)));
} catch (Exception e) {
e.printStackTrace();
}
}

Le mardi 24 juin 2014 13:44:03 UTC+2, Frederic Esnault a écrit :

Thanks for all this.

I changed my conf, removed all the thread pool config, reduced refresh
time to 5s according to Michael advice, and limited my batch to 10 000.
I'll see how it works then i'll paralellize the bulk insert.
I'll tell you how it ends up.

Thanks again !

Le lundi 23 juin 2014 12:56:14 UTC+2, Jörg Prante a écrit :

Your bulk insert size is too large. It makes no sense to insert 100.000
with one request. Use 1000-10000 instead.

Also you should submit bulk requests in parallel and not sequential like
you do. Sequential bulk is slow if client CPU/network is not saturated.

Check if you have disabled the index refresh from 1 (1s) to -1 while bulk
indexing is active. 30s makes not much sense if you can execute the bulk in
this time.

Do not limit indexing memory to 50%.

It makes no sense to increase queue_size for bulk thread pool to 1000.
This means you want a single ES node should accept 1000 x 100000 = 100 000
000 = 100m docs at once. This will simply exceeds all reasonable limits and
bring the node down with an OOM (if you really have 100m docs).

More advice is possible if you can show your client code how you push
docs to ES.

Jörg

On Mon, Jun 23, 2014 at 12:30 PM, Frederic Esnault
esnault....@gmail.com wrote:

Hi everyone,

I'm inserting around 265 000 documents into an elastic search cluster
composed of 3 nodes (real servers).
On two servers i give elastic search 20g of heap, on third one which has
64g ram, i set 30g of heap for elastic search.

I set elastic search configuration to :

  • 3 shards (1 per server)
  • 0 replicas
  • discovery.zen.ping.multicast.enabled: false (and giving on each node
    the unicast hostnames of the two other nodes);
  • and this :

indices.memory.index_buffer_size: 50%
index.refresh_interval: 30s
threadpool:
index:
type: fixed
size: 30
queue_size: 1000
bulk:
queue_size: 1000
bulk:
type: fixed
size: 30
queue_size: 1000
search:
type: fixed
size: 100
queue_size: 200
get:
type: fixed
size: 100
queue_size: 200

Indexing is done by groups of 100 000 docs, and here is my application
log :
INFO: Adding records to bulk insert batch
INFO: Added 100000 records to bulk insert batch. Inserting batch...
-- Bulk insert took 38.724 secondes
INFO: Adding records to bulk insert batch
INFO: Added 100000 records to bulk insert batch. Inserting batch...
-- Bulk insert took 31.134 secondes
INFO: Adding records to bulk insert batch
INFO: Added 64201 records to bulk insert batch. Inserting batch...
-- Bulk insert took 17.366 secondes

--- Import CSV file took 108.905 secondes ---

I'm wondering if this time is correct or not, or if there is something i
can do to improve performances ?

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to elasticsearc...@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/32280fd5-5879-4424-882d-5b4e7674557a%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAJQxjPNySdzcXVBDbn5CyE-QQOndGdPdC9xc0%3D0PFYpJfivuUQ%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.


(Jörg Prante) #7

You should use the org.elasticsearch.action.bulk.BulkProcessor helper class
for concurrent bulk indexing.

Jörg

On Tue, Jun 24, 2014 at 5:34 PM, Frederic Esnault <
esnault.frederic@gmail.com> wrote:

Hi again,

any idea about how to parallelize the bulk insert process ?
I tried creating 4 BulkInserters extending RecursiveAction and executed
them all, but the result is awful, 3 of them finished very slowly, and one
did not finish (don't know why), and got only 70K docs in ES instead of 265
000...

The result of downsizing the batches sizes to 10 000 is not really big,
total process took approx. 1 second less (Actually this is much lower than
in the previous post, because i moved the importing UI to my server, close
to one of ES nodes). Was more than 29 seconds, now 28.
28 seconds.

Import CSV file took 28.069 secondes

Here is the insertion code. The Iterator is a CSV reading iterator who
parses lines and returns Record instances (object with generic object
values, indexed as string). MAX_RECORDS is my batch size, set to 10 000.

public void insert(Iterator<Record> recordsIterator) {
    while (recordsIterator.hasNext()) {
        batchInsert(recordsIterator, MAX_RECORDS);
    }
}

private void batchInsert(Iterator<Record> recordsIterator, int limit) {
    BulkRequestBuilder bulkRequest = client.prepareBulk();
    int processed = 0;
    try {
        logger.log(Level.INFO, "Adding records to bulk insert batch");
        while (recordsIterator.hasNext() && processed < limit) {
            processed++;
            Record record = recordsIterator.next();
            IndexRequestBuilder builder =

client.prepareIndex(datasetName, RECORD);
XContentBuilder data = jsonBuilder();
data.startObject();
for (ColumnMetadata column :
dataset.getMetadata().getColumns()) {
Object value =
record.getCell(column.getName()).getValue();
if (value == null || (value instanceof String &&
value.equals("NULL"))) {
value = null;
}
data.field(column.getNormalizedName(), value);
}
data.endObject();
builder.setSource(data);
bulkRequest.add(builder);
}
logger.log(Level.INFO, "Added "+ bulkRequest.numberOfActions()
+" records to bulk insert batch. Inserting batch...");
long current = System.currentTimeMillis();
BulkResponse bulkResponse =
bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE).execute().actionGet();
if (bulkResponse.hasFailures()) {
logger.log(Level.SEVERE, "Could not index : " +
bulkResponse.buildFailureMessage());
}
System.out
.println(String.format("Bulk insert took %s secondes",
NumberUtils
.formatSeconds(((double)
(System.currentTimeMillis() - current)) / 1000.0)));
} catch (Exception e) {
e.printStackTrace();
}
}

Le mardi 24 juin 2014 13:44:03 UTC+2, Frederic Esnault a écrit :

Thanks for all this.

I changed my conf, removed all the thread pool config, reduced refresh
time to 5s according to Michael advice, and limited my batch to 10 000.
I'll see how it works then i'll paralellize the bulk insert.
I'll tell you how it ends up.

Thanks again !

Le lundi 23 juin 2014 12:56:14 UTC+2, Jörg Prante a écrit :

Your bulk insert size is too large. It makes no sense to insert 100.000
with one request. Use 1000-10000 instead.

Also you should submit bulk requests in parallel and not sequential like
you do. Sequential bulk is slow if client CPU/network is not saturated.

Check if you have disabled the index refresh from 1 (1s) to -1 while
bulk indexing is active. 30s makes not much sense if you can execute the
bulk in this time.

Do not limit indexing memory to 50%.

It makes no sense to increase queue_size for bulk thread pool to 1000.
This means you want a single ES node should accept 1000 x 100000 = 100 000
000 = 100m docs at once. This will simply exceeds all reasonable limits and
bring the node down with an OOM (if you really have 100m docs).

More advice is possible if you can show your client code how you push
docs to ES.

Jörg

On Mon, Jun 23, 2014 at 12:30 PM, Frederic Esnault <
esnault....@gmail.com> wrote:

Hi everyone,

I'm inserting around 265 000 documents into an elastic search cluster
composed of 3 nodes (real servers).
On two servers i give elastic search 20g of heap, on third one which
has 64g ram, i set 30g of heap for elastic search.

I set elastic search configuration to :

  • 3 shards (1 per server)
  • 0 replicas
  • discovery.zen.ping.multicast.enabled: false (and giving on each node
    the unicast hostnames of the two other nodes);
  • and this :

indices.memory.index_buffer_size: 50%
index.refresh_interval: 30s
threadpool:
index:
type: fixed
size: 30
queue_size: 1000
bulk:
queue_size: 1000
bulk:
type: fixed
size: 30
queue_size: 1000
search:
type: fixed
size: 100
queue_size: 200
get:
type: fixed
size: 100
queue_size: 200

Indexing is done by groups of 100 000 docs, and here is my application
log :
INFO: Adding records to bulk insert batch
INFO: Added 100000 records to bulk insert batch. Inserting batch...
-- Bulk insert took 38.724 secondes
INFO: Adding records to bulk insert batch
INFO: Added 100000 records to bulk insert batch. Inserting batch...
-- Bulk insert took 31.134 secondes
INFO: Adding records to bulk insert batch
INFO: Added 64201 records to bulk insert batch. Inserting batch...
-- Bulk insert took 17.366 secondes

--- Import CSV file took 108.905 secondes ---

I'm wondering if this time is correct or not, or if there is something
i can do to improve performances ?

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to elasticsearc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/
msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22%
40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/32280fd5-5879-4424-882d-5b4e7674557a%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/32280fd5-5879-4424-882d-5b4e7674557a%40googlegroups.com?utm_medium=email&utm_source=footer
.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAKdsXoGcdfyp9k-iSTUVkHuCd1WrxdRQYygO4b6mG4PdVb-zHA%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.


(phoenix) #8

Thanks to both of you, i'll look at this immediately !

Le mardi 24 juin 2014 17:51:04 UTC+2, Jörg Prante a écrit :

You should use the org.elasticsearch.action.bulk.BulkProcessor helper
class for concurrent bulk indexing.

Jörg

On Tue, Jun 24, 2014 at 5:34 PM, Frederic Esnault <esnault....@gmail.com
<javascript:>> wrote:

Hi again,

any idea about how to parallelize the bulk insert process ?
I tried creating 4 BulkInserters extending RecursiveAction and executed
them all, but the result is awful, 3 of them finished very slowly, and one
did not finish (don't know why), and got only 70K docs in ES instead of 265
000...

The result of downsizing the batches sizes to 10 000 is not really big,
total process took approx. 1 second less (Actually this is much lower than
in the previous post, because i moved the importing UI to my server, close
to one of ES nodes). Was more than 29 seconds, now 28.
28 seconds.

Import CSV file took 28.069 secondes

Here is the insertion code. The Iterator is a CSV reading iterator who
parses lines and returns Record instances (object with generic object
values, indexed as string). MAX_RECORDS is my batch size, set to 10 000.

public void insert(Iterator<Record> recordsIterator) {
    while (recordsIterator.hasNext()) {
        batchInsert(recordsIterator, MAX_RECORDS);
    }
}

private void batchInsert(Iterator<Record> recordsIterator, int limit) 

{
BulkRequestBuilder bulkRequest = client.prepareBulk();
int processed = 0;
try {
logger.log(Level.INFO, "Adding records to bulk insert batch");
while (recordsIterator.hasNext() && processed < limit) {
processed++;
Record record = recordsIterator.next();
IndexRequestBuilder builder =
client.prepareIndex(datasetName, RECORD);
XContentBuilder data = jsonBuilder();
data.startObject();
for (ColumnMetadata column :
dataset.getMetadata().getColumns()) {
Object value =
record.getCell(column.getName()).getValue();
if (value == null || (value instanceof String &&
value.equals("NULL"))) {
value = null;
}
data.field(column.getNormalizedName(), value);
}
data.endObject();
builder.setSource(data);
bulkRequest.add(builder);
}
logger.log(Level.INFO, "Added "+
bulkRequest.numberOfActions() +" records to bulk insert batch. Inserting
batch...");
long current = System.currentTimeMillis();
BulkResponse bulkResponse =
bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE).execute().actionGet();
if (bulkResponse.hasFailures()) {
logger.log(Level.SEVERE, "Could not index : " +
bulkResponse.buildFailureMessage());
}
System.out
.println(String.format("Bulk insert took %s
secondes", NumberUtils
.formatSeconds(((double)
(System.currentTimeMillis() - current)) / 1000.0)));
} catch (Exception e) {
e.printStackTrace();
}
}

Le mardi 24 juin 2014 13:44:03 UTC+2, Frederic Esnault a écrit :

Thanks for all this.

I changed my conf, removed all the thread pool config, reduced refresh
time to 5s according to Michael advice, and limited my batch to 10 000.
I'll see how it works then i'll paralellize the bulk insert.
I'll tell you how it ends up.

Thanks again !

Le lundi 23 juin 2014 12:56:14 UTC+2, Jörg Prante a écrit :

Your bulk insert size is too large. It makes no sense to insert 100.000
with one request. Use 1000-10000 instead.

Also you should submit bulk requests in parallel and not sequential
like you do. Sequential bulk is slow if client CPU/network is not saturated.

Check if you have disabled the index refresh from 1 (1s) to -1 while
bulk indexing is active. 30s makes not much sense if you can execute the
bulk in this time.

Do not limit indexing memory to 50%.

It makes no sense to increase queue_size for bulk thread pool to 1000.
This means you want a single ES node should accept 1000 x 100000 = 100 000
000 = 100m docs at once. This will simply exceeds all reasonable limits and
bring the node down with an OOM (if you really have 100m docs).

More advice is possible if you can show your client code how you push
docs to ES.

Jörg

On Mon, Jun 23, 2014 at 12:30 PM, Frederic Esnault <
esnault....@gmail.com> wrote:

Hi everyone,

I'm inserting around 265 000 documents into an elastic search cluster
composed of 3 nodes (real servers).
On two servers i give elastic search 20g of heap, on third one which
has 64g ram, i set 30g of heap for elastic search.

I set elastic search configuration to :

  • 3 shards (1 per server)
  • 0 replicas
  • discovery.zen.ping.multicast.enabled: false (and giving on each
    node the unicast hostnames of the two other nodes);
  • and this :

indices.memory.index_buffer_size: 50%
index.refresh_interval: 30s
threadpool:
index:
type: fixed
size: 30
queue_size: 1000
bulk:
queue_size: 1000
bulk:
type: fixed
size: 30
queue_size: 1000
search:
type: fixed
size: 100
queue_size: 200
get:
type: fixed
size: 100
queue_size: 200

Indexing is done by groups of 100 000 docs, and here is my application
log :
INFO: Adding records to bulk insert batch
INFO: Added 100000 records to bulk insert batch. Inserting batch...
-- Bulk insert took 38.724 secondes
INFO: Adding records to bulk insert batch
INFO: Added 100000 records to bulk insert batch. Inserting batch...
-- Bulk insert took 31.134 secondes
INFO: Adding records to bulk insert batch
INFO: Added 64201 records to bulk insert batch. Inserting batch...
-- Bulk insert took 17.366 secondes

--- Import CSV file took 108.905 secondes ---

I'm wondering if this time is correct or not, or if there is something
i can do to improve performances ?

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to elasticsearc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/
msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22%
40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearc...@googlegroups.com <javascript:>.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/32280fd5-5879-4424-882d-5b4e7674557a%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/32280fd5-5879-4424-882d-5b4e7674557a%40googlegroups.com?utm_medium=email&utm_source=footer
.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/9fc68d44-caf9-4315-b846-29ac5e1f8988%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Bulk import via node.js script
(phoenix) #9

Wow thanks a LOT, Cedric and Jörg !

Got down to 15,8 seconds for 264000 documents.

Bulk processing took 15.863 seconds
Import CSV file took 15.874 secondes

If you have any tips to tune it, i take too :slight_smile:

For example i didn't use the MultiGetRequestBuilder, just a new
IndexRequest for each doc.
Would it help to use the MultiGet ? Can't really figure out how to use it.

Le mardi 24 juin 2014 17:48:43 UTC+2, Cédric Hourcade a écrit :

Hello,

You can use the BulkProcessor class to do the work for you:

https://github.com/elasticsearch/elasticsearch/blob/master/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java

Just configure/instantiate the class and .add() your index requests.
See:
https://github.com/elasticsearch/elasticsearch/blob/master/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java

Cédric Hourcade
c...@wal.fr <javascript:>

On Tue, Jun 24, 2014 at 5:34 PM, Frederic Esnault
<esnault....@gmail.com <javascript:>> wrote:

Hi again,

any idea about how to parallelize the bulk insert process ?
I tried creating 4 BulkInserters extending RecursiveAction and executed
them
all, but the result is awful, 3 of them finished very slowly, and one
did
not finish (don't know why), and got only 70K docs in ES instead of 265
000...

The result of downsizing the batches sizes to 10 000 is not really big,
total process took approx. 1 second less (Actually this is much lower
than
in the previous post, because i moved the importing UI to my server,
close
to one of ES nodes). Was more than 29 seconds, now 28.
28 seconds.

Import CSV file took 28.069 secondes

Here is the insertion code. The Iterator is a CSV reading iterator who
parses lines and returns Record instances (object with generic object
values, indexed as string). MAX_RECORDS is my batch size, set to 10

public void insert(Iterator<Record> recordsIterator) { 
    while (recordsIterator.hasNext()) { 
        batchInsert(recordsIterator, MAX_RECORDS); 
    } 
} 

private void batchInsert(Iterator<Record> recordsIterator, int 

limit) {

    BulkRequestBuilder bulkRequest = client.prepareBulk(); 
    int processed = 0; 
    try { 
        logger.log(Level.INFO, "Adding records to bulk insert 

batch");

        while (recordsIterator.hasNext() && processed < limit) { 
            processed++; 
            Record record = recordsIterator.next(); 
            IndexRequestBuilder builder = 

client.prepareIndex(datasetName, RECORD);
XContentBuilder data = jsonBuilder();
data.startObject();
for (ColumnMetadata column :
dataset.getMetadata().getColumns()) {
Object value =
record.getCell(column.getName()).getValue();
if (value == null || (value instanceof String &&
value.equals("NULL"))) {
value = null;
}
data.field(column.getNormalizedName(), value);
}
data.endObject();
builder.setSource(data);
bulkRequest.add(builder);
}
logger.log(Level.INFO, "Added "+
bulkRequest.numberOfActions()
+" records to bulk insert batch. Inserting batch...");
long current = System.currentTimeMillis();
BulkResponse bulkResponse =

bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE).execute().actionGet();

        if (bulkResponse.hasFailures()) { 
            logger.log(Level.SEVERE, "Could not index : " + 

bulkResponse.buildFailureMessage());
}
System.out
.println(String.format("Bulk insert took %s
secondes",
NumberUtils
.formatSeconds(((double)
(System.currentTimeMillis() - current)) / 1000.0)));
} catch (Exception e) {
e.printStackTrace();
}
}

Le mardi 24 juin 2014 13:44:03 UTC+2, Frederic Esnault a écrit :

Thanks for all this.

I changed my conf, removed all the thread pool config, reduced refresh
time to 5s according to Michael advice, and limited my batch to 10 000.
I'll see how it works then i'll paralellize the bulk insert.
I'll tell you how it ends up.

Thanks again !

Le lundi 23 juin 2014 12:56:14 UTC+2, Jörg Prante a écrit :

Your bulk insert size is too large. It makes no sense to insert
100.000

with one request. Use 1000-10000 instead.

Also you should submit bulk requests in parallel and not sequential
like

you do. Sequential bulk is slow if client CPU/network is not
saturated.

Check if you have disabled the index refresh from 1 (1s) to -1 while
bulk

indexing is active. 30s makes not much sense if you can execute the
bulk in

this time.

Do not limit indexing memory to 50%.

It makes no sense to increase queue_size for bulk thread pool to 1000.
This means you want a single ES node should accept 1000 x 100000 = 100
000

000 = 100m docs at once. This will simply exceeds all reasonable
limits and

bring the node down with an OOM (if you really have 100m docs).

More advice is possible if you can show your client code how you push
docs to ES.

Jörg

On Mon, Jun 23, 2014 at 12:30 PM, Frederic Esnault
esnault....@gmail.com wrote:

Hi everyone,

I'm inserting around 265 000 documents into an elastic search cluster
composed of 3 nodes (real servers).
On two servers i give elastic search 20g of heap, on third one which
has

64g ram, i set 30g of heap for elastic search.

I set elastic search configuration to :

  • 3 shards (1 per server)
  • 0 replicas
  • discovery.zen.ping.multicast.enabled: false (and giving on each
    node

the unicast hostnames of the two other nodes);

  • and this :

indices.memory.index_buffer_size: 50%
index.refresh_interval: 30s
threadpool:
index:
type: fixed
size: 30
queue_size: 1000
bulk:
queue_size: 1000
bulk:
type: fixed
size: 30
queue_size: 1000
search:
type: fixed
size: 100
queue_size: 200
get:
type: fixed
size: 100
queue_size: 200

Indexing is done by groups of 100 000 docs, and here is my
application

log :
INFO: Adding records to bulk insert batch
INFO: Added 100000 records to bulk insert batch. Inserting batch...
-- Bulk insert took 38.724 secondes
INFO: Adding records to bulk insert batch
INFO: Added 100000 records to bulk insert batch. Inserting batch...
-- Bulk insert took 31.134 secondes
INFO: Adding records to bulk insert batch
INFO: Added 64201 records to bulk insert batch. Inserting batch...
-- Bulk insert took 17.366 secondes

--- Import CSV file took 108.905 secondes ---

I'm wondering if this time is correct or not, or if there is
something i

can do to improve performances ?

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it,
send

an email to elasticsearc...@googlegroups.com.
To view this discussion on the web visit

https://groups.google.com/d/msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google
Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an
email to elasticsearc...@googlegroups.com <javascript:>.
To view this discussion on the web visit

https://groups.google.com/d/msgid/elasticsearch/32280fd5-5879-4424-882d-5b4e7674557a%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/b7439ca9-6074-4eef-94a6-4a3019a95759%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


(Vladimir) #10

Hi everyone!

Basically we have the same problem. ES - 1.6
from the client we are trying to send 10k rows in one bulk query. And we've tried to do this 2 ways: via BulkRequestBuilder with our "handmade" threads and via BulkProcessor(5 concurent threads). The initial data for send - 1.5-2mb

So, we have next results for BulkProcessor.
(for example :On ES side = 6301, Our side MS: 9200, diff: 2899* - means that 6301 this is time from bulkresponse.timeinmiliss, 9200- it's 6301+transport issues, 2899 - ...)

2015-07-02 18:19:22,561 [elasticsearch[Lighting Rod][transport_client_worker][T#6]{New I/O worker #6}] INFO SenderThread2 - OK2: On ES side = 6301, Our side MS: 9200, diff: 2899
2015-07-02 18:19:22,562 [elasticsearch[Lighting Rod][transport_client_worker][T#5]{New I/O worker #5}] INFO SenderThread2 - OK2: On ES side = 5779, Our side MS: 9295, diff: 3516
2015-07-02 18:19:22,566 [elasticsearch[Lighting Rod][transport_client_worker][T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: On ES side = 5498, Our side MS: 9713, diff: 4215
2015-07-02 18:19:22,571 [elasticsearch[Lighting Rod][transport_client_worker][T#6]{New I/O worker #6}] INFO SenderThread2 - OK2: On ES side = 6008, Our side MS: 8264, diff: 2256
2015-07-02 18:19:22,573 [elasticsearch[Lighting Rod][transport_client_worker][T#5]{New I/O worker #5}] INFO SenderThread2 - OK2: On ES side = 6455, Our side MS: 8362, diff: 1907
2015-07-02 18:19:22,578 [elasticsearch[Lighting Rod][transport_client_worker][T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: On ES side = 6037, Our side MS: 8471, diff: 2434
2015-07-02 18:19:22,585 [elasticsearch[Lighting Rod][transport_client_worker][T#5]{New I/O worker #5}] INFO SenderThread2 - OK2: On ES side = 6962, Our side MS: 7393, diff: 431
2015-07-02 18:19:22,677 [elasticsearch[Lighting Rod][transport_client_worker][T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: On ES side = 6556, Our side MS: 7588, diff: 1032
2015-07-02 18:19:22,719 [elasticsearch[Lighting Rod][transport_client_worker][T#6]{New I/O worker #6}] INFO SenderThread2 - OK2: On ES side = 6765, Our side MS: 7400, diff: 635
2015-07-02 18:19:23,145 [elasticsearch[Lighting Rod][transport_client_worker][T#5]{New I/O worker #5}] INFO SenderThread2 - OK2: On ES side = 7021, Our side MS: 7303, diff: 282
2015-07-02 18:19:23,963 [elasticsearch[Lighting Rod][transport_client_worker][T#6]{New I/O worker #6}] INFO SenderThread2 - OK2: On ES side = 7602, Our side MS: 7709, diff: 107
2015-07-02 18:19:24,195 [elasticsearch[Lighting Rod][transport_client_worker][T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: On ES side = 8245, Our side MS: 8448, diff: 203
2015-07-02 18:19:25,266 [elasticsearch[Lighting Rod][transport_client_worker][T#6]{New I/O worker #6}] INFO SenderThread2 - OK2: On ES side = 2536, Our side MS: 2674, diff: 138
2015-07-02 18:19:25,266 [elasticsearch[Lighting Rod][transport_client_worker][T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: On ES side = 8738, Our side MS: 8888, diff: 150
2015-07-02 18:19:26,671 [elasticsearch[Lighting Rod][transport_client_worker][T#5]{New I/O worker #5}] INFO SenderThread2 - OK2: On ES side = 3543, Our side MS: 10182, diff: 6639
2015-07-02 18:19:26,750 [elasticsearch[Lighting Rod][transport_client_worker][T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: On ES side = 3996, Our side MS: 4058, diff: 62
2015-07-02 18:19:28,039 [elasticsearch[Lighting Rod][transport_client_worker][T#6]{New I/O worker #6}] INFO SenderThread2 - OK2: On ES side = 4681, Our side MS: 4787, diff: 106
2015-07-02 18:19:28,040 [elasticsearch[Lighting Rod][transport_client_worker][T#5]{New I/O worker #5}] INFO SenderThread2 - OK2: On ES side = 4558, Our side MS: 4882, diff: 324
2015-07-02 18:19:28,476 [elasticsearch[Lighting Rod][transport_client_worker][T#5]{New I/O worker #5}] INFO SenderThread2 - OK2: On ES side = 4506, Our side MS: 4552, diff: 46
2015-07-02 18:19:29,298 [elasticsearch[Lighting Rod][transport_client_worker][T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: On ES side = 5215, Our side MS: 5480, diff: 265
2015-07-02 18:19:29,712 [elasticsearch[Lighting Rod][transport_client_worker][T#6]{New I/O worker #6}] INFO SenderThread2 - OK2: On ES side = 5251, Our side MS: 5693, diff: 442
2015-07-02 18:19:30,222 [elasticsearch[Lighting Rod][transport_client_worker][T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: On ES side = 5943, Our side MS: 6013, diff: 70
java.lang.OutOfMemoryError: Java heap space
Dumping heap to /home/ElasticSearchSender-1.01-SNAPSHOT/java_pid16578.hprof ...

and next for the ours threads:
2015-07-02 18:09:25,185 [ElasticSearchSender-Thread-7-Client-7] INFO SenderThread - OK1: On ES side = 3411, Our side MS: 3470, diff: 59
2015-07-02 18:09:25,455 [ElasticSearchSender-Thread-5-Client-5] INFO SenderThread - OK1: On ES side = 3653, Our side MS: 3749, diff: 96
2015-07-02 18:09:25,634 [ElasticSearchSender-Thread-8-Client-8] INFO SenderThread - OK1: On ES side = 3781, Our side MS: 3914, diff: 133
2015-07-02 18:09:29,254 [ElasticSearchSender-Thread-1-Client-1] INFO SenderThread - OK1: On ES side = 5352, Our side MS: 5703, diff: 351
2015-07-02 18:09:29,674 [ElasticSearchSender-Thread-6-Client-6] INFO SenderThread - OK1: On ES side = 5592, Our side MS: 5800, diff: 208
2015-07-02 18:09:31,442 [ElasticSearchSender-Thread-4-Client-4] INFO SenderThread - OK1: On ES side = 6248, Our side MS: 6396, diff: 148
2015-07-02 18:09:31,924 [ElasticSearchSender-Thread-7-Client-7] INFO SenderThread - OK1: On ES side = 6300, Our side MS: 6601, diff: 301
2015-07-02 18:09:31,979 [ElasticSearchSender-Thread-3-Client-3] INFO SenderThread - OK1: On ES side = 6779, Our side MS: 6869, diff: 90
2015-07-02 18:09:40,108 [ElasticSearchSender-Thread-8-Client-8] INFO SenderThread - OK1: On ES side = 6605, Our side MS: 14383, diff: 7778
2015-07-02 18:09:40,108 [ElasticSearchSender-Thread-4-Client-4] INFO SenderThread - OK1: On ES side = 1806, Our side MS: 8575, diff: 6769
2015-07-02 18:09:40,109 [ElasticSearchSender-Thread-5-Client-5] INFO SenderThread - OK1: On ES side = 6797, Our side MS: 14472, diff: 7675
2015-07-02 18:09:40,118 [ElasticSearchSender-Thread-2-Client-2] INFO SenderThread - OK1: On ES side = 7014, Our side MS: 14792, diff: 7778
2015-07-02 18:09:40,118 [ElasticSearchSender-Thread-1-Client-1] INFO SenderThread - OK1: On ES side = 3760, Our side MS: 10747, diff: 6987
2015-07-02 18:09:40,118 [ElasticSearchSender-Thread-6-Client-6] INFO SenderThread - OK1: On ES side = 3175, Our side MS: 10261, diff: 7086
2015-07-02 18:09:43,219 [ElasticSearchSender-Thread-7-Client-7] INFO SenderThread - OK1: On ES side = 3037, Our side MS: 3097, diff: 60
2015-07-02 18:09:44,994 [ElasticSearchSender-Thread-1-Client-1] INFO SenderThread - OK1: On ES side = 4545, Our side MS: 4592, diff: 47
2015-07-02 18:09:45,174 [ElasticSearchSender-Thread-4-Client-4] INFO SenderThread - OK1: On ES side = 4578, Our side MS: 4787, diff: 209
2015-07-02 18:09:45,200 [ElasticSearchSender-Thread-3-Client-3] INFO SenderThread - OK1: On ES side = 4766, Our side MS: 4812, diff: 46
2015-07-02 18:09:45,214 [ElasticSearchSender-Thread-2-Client-2] INFO SenderThread - OK1: On ES side = 4763, Our side MS: 4814, diff: 51
2015-07-02 18:09:45,214 [ElasticSearchSender-Thread-5-Client-5] INFO SenderThread - OK1: On ES side = 4785, Our side MS: 4840, diff: 55
2015-07-02 18:09:45,497 [ElasticSearchSender-Thread-8-Client-8] INFO SenderThread - OK1: On ES side = 4860, Our side MS: 5074, diff: 214
2015-07-02 18:09:45,503 [ElasticSearchSender-Thread-6-Client-6] INFO SenderThread - OK1: On ES side = 4973, Our side MS: 5091, diff: 118
2015-07-02 18:09:46,987 [ElasticSearchSender-Thread-7-Client-7] INFO SenderThread - OK1: On ES side = 3540, Our side MS: 3585, diff: 45
2015-07-02 18:09:48,709 [ElasticSearchSender-Thread-1-Client-1] INFO SenderThread - OK1: On ES side = 3514, Our side MS: 3629, diff: 115
2015-07-02 18:09:50,491 [ElasticSearchSender-Thread-3-Client-3] INFO SenderThread - OK1: On ES side = 4849, Our side MS: 4903, diff: 54
2015-07-02 18:09:50,494 [ElasticSearchSender-Thread-2-Client-2] INFO SenderThread - OK1: On ES side = 4844, Our side MS: 4891, diff: 47

I believe, that this is a possible to save much quickly that 4-5s for 1.5 mb... Please, tell me what is wrong with our solution?
ES config
index.number_of_shards: 5
index.number_of_replicas: 1
path.conf: /home/elasticsearch-1.6.0/config

bootstrap.mlockall: true

#index
indices.memory.index_buffer_size: 30%
index.store.type: mmapfs
#index.store.type: niofs
index.translog.flush_threshold_ops: 50000
index.refresh_interval: -1
indices.memory.min_shard_index_buffer_size: 100mb
indices.memory.min_index_buffer_size: 100mb

#throttle
index.store.throttle.max_bytes_per_sec: 60mb
indices.store.throttle.type: none

threadpool.bulk.queue_size: 3000
#indices.fielddata.cache.size: 25%

cluster.routing.allocation.disk.watermark.low: 80%
cluster.routing.allocation.disk.watermark.high: 50gb
cluster.info.update.interval: 1m
index.warmer.enabled: false

Search pool

threadpool.search.type: fixed
threadpool.search.size: 20
threadpool.search.queue_size: 100

Bulk pool

threadpool.bulk.type: fixed
#threadpool.bulk.size: 60
threadpool.bulk.queue_size: 3000

Index pool

threadpool.index.type: fixed
threadpool.index.size: 20
threadpool.index.queue_size: 100

Cache Sizes

indices.fielddata.cache.size: 15%
indices.fielddata.cache.expire: 6h
indices.cache.filter.size: 15%
indices.cache.filter.expire: 6h

index.merge.scheduler.max_thread_count: 1

bulk.udp.enabled: true
bulk.udp.host: "localhost"

marvel.agent.exporter.es.hosts: "http://10.1.24.93:9200"

thank you very much!


(Jörg Prante) #11

You seem to not evaluate the bulk responses before sending next requests. Can you show your code?

Check BulkProcessor for handling a maximum limit of bulk requests. The maximum limit depends on the capacity (nodes, CPU threads) of your cluster.


(Vladimir) #12

Hello, Jörg!

code(i think you are right about not evaluting for before next requests:
probably we should make 1 instance of BulkProcessor and let him to flush data by itself?
and also, could you check - is it correct way for creation IndexRequestBuilder and it's source?

Thank you very much!!!

   while (true) {

//read data from file
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
long start1;

                @Override
                public void beforeBulk(long executionId, BulkRequest request) {

                    start1 = System.currentTimeMillis();
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request, BulkResponse bulkResponse) {

                    long l = System.currentTimeMillis() - start1;
                    long tookInMillis = bulkResponse.getTookInMillis();
                    logger.info("OK2: On ES side = %s,\tOur side MS: %s,\tdiff: %s", tookInMillis, l, l - tookInMillis);
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

                    logger.debug("Error" + failure);
                }
            })  .setConcurrentRequests(5).build();

for (RecordList.Record record : recordList.getRecordList()) {
try {
IndexRequestBuilder indexRequestBuilder = client.prepareIndex(index, type);
XContentBuilder contentBuilder = jsonBuilder().startObject();//.prettyPrint();
contentBuilder.field("loginName", record.loginName);
contentBuilder.field("message", record.message);
contentBuilder.field("threadName", record.threadName);
contentBuilder.field("level", record.level);
contentBuilder.field("date", record.date);
contentBuilder.endObject();
indexRequestBuilder.setSource(contentBuilder);

                    bulkProcessor.add(indexRequestBuilder.request());
                } catch (Exception ex) {
                    logger.error(ex, "ERROR create request object for file: %s. Message: %s", fileName, ex.getMessage());
                }
            }

            **bulkProcessor.flush();**

}


(Vladimir) #13

I've rewrite code like this:
try {
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
long start1;

            @Override
            public void beforeBulk(long executionId, BulkRequest request) {

                start1 = System.currentTimeMillis();
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse bulkResponse) {

                long l = System.currentTimeMillis() - start1;
                long tookInMillis = bulkResponse.getTookInMillis();
                long estimatedSizeInBytes = request.estimatedSizeInBytes();
                int numberOfActions = request.numberOfActions();

                logger.info("OK2: Bytes=%s, Actions=%s, ES: %s,\tFull: %s,\tdiff: %s", estimatedSizeInBytes, numberOfActions,
                      tookInMillis, l, l - tookInMillis);
                //если все хорошо - удалим файл
                //                    deleteFile(finalRecordListFile);
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

                logger.debug("Error" + failure);
                //если были ошибки отправки - вернем файл обратно в обработку
                //                    resetFileName(finalRecordListFile);
            }
        }).setBulkActions(10000).setBulkSize(new ByteSizeValue(7, ByteSizeUnit.MB)).setFlushInterval(TimeValue.timeValueSeconds(5)).setConcurrentRequests(5).build();

while (true) {
for (RecordList.Record record : recordList.getRecordList()) {
try {
IndexRequestBuilder indexRequestBuilder = client.prepareIndex(index, type);
XContentBuilder contentBuilder = jsonBuilder().startObject();//.prettyPrint();
contentBuilder.field("loginName", record.loginName);
contentBuilder.field("message", record.message);
contentBuilder.field("threadName", record.threadName);
contentBuilder.field("level", record.level);
contentBuilder.field("date", record.date);
contentBuilder.endObject();
indexRequestBuilder.setSource(contentBuilder);

                    bulkProcessor.add(indexRequestBuilder.request());
                } catch (Exception ex) {
                    logger.error(ex, "ERROR create request object for file: %s. Message: %s", fileName, ex.getMessage());
                }
            }
            deleteFile(recordListFile);

}

and now what we see.... very strage that bulkrequest almost always have constant size.. First of all I've thought, that beforeBulk have been invoked only once, but I see also few different values. strange)

[T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 2038, Full: 128, diff: -1910
[T#6]{New I/O worker #6}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1251, Full: 127, diff: -1124
[T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1496, Full: 210, diff: -1286
[T#5]{New I/O worker #5}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1701, Full: 307, diff: -1394
[T#6]{New I/O worker #6}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1551, Full: 307, diff: -1244
[T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1471, Full: 85, diff: -1386
[T#5]{New I/O worker #5}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1532, Full: 427, diff: -1105
[T#6]{New I/O worker #6}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1510, Full: 106, diff: -1404
[T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1475, Full: 264, diff: -1211
[T#5]{New I/O worker #5}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1325, Full: 197, diff: -1128
[T#6]{New I/O worker #6}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1558, Full: 384, diff: -1174
[T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1487, Full: 61, diff: -1426
[T#5]{New I/O worker #5}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1542, Full: 176, diff: -1366
[T#6]{New I/O worker #6}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1263, Full: 249, diff: -1014
[T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1439, Full: 271, diff: -1168
[T#5]{New I/O worker #5}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1017, Full: 312, diff: -705
[T#6]{New I/O worker #6}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1240, Full: 325, diff: -915
[T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1251, Full: 181, diff: -1070
[T#5]{New I/O worker #5}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1483, Full: 277, diff: -1206
[T#6]{New I/O worker #6}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1218, Full: 12, diff: -1206
[T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1466, Full: 354, diff: -1112
[T#5]{New I/O worker #5}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1333, Full: 187, diff: -1146
[T#6]{New I/O worker #6}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1278, Full: 53, diff: -1225
[T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1400, Full: 292, diff: -1108
[T#5]{New I/O worker #5}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1493, Full: 300, diff: -1193
[T#6]{New I/O worker #6}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1287, Full: 2, diff: -1285
[T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 2002, Full: 908, diff: -1094
[T#5]{New I/O worker #5}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1822, Full: 209, diff: -1613
[T#6]{New I/O worker #6}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1920, Full: 6, diff: -1914
[T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: Bytes=1870000, Actions=10000, ES: 1776, Full: 148, diff: -1628
[T#4]{New I/O worker #4}] INFO SenderThread2 - OK2: Bytes=748, Actions=4, ES: 752, Full: 141, diff: -611


(Jörg Prante) #14

BulkProcessor uses a size limit on bulk request of 5 MB and 1000 actions by default. What are your experiences with the default settings?

You set concurrent bulk requests to 5. Why 5? You should log the concurrency level in afterBulk so you can see how fast your cluster responds. Maybe a higher number makes sense - it depends on the cluster power.

If your setting of 10000 actions are cumulating into a huge bulk request with more than 5 MB, you should decrease the actions per bulk request. It takes some tries to find the best size for actions, because it depends on cluster power / your index setup.


(system) #15