Bulk indexing, how to prevent flooding?


(Jörg Prante) #1

Hi,

how can I prevent bad clients from flooding an Elasticsearch cluster
especiall when using bulk indexing?

Imagine remote indexing by a TransportClient with bad habits, i.e. it
ignores the messages in ActionListener and continues to
submit bulk index requests.

I tried exercising a cluster that way and the RHEL6 server of the
Elasticsearch master node at some time will sooner or later start to
drown. Even with more than 60.000 max files setting, the JDK will
start to report too many open files, obviously because of the pile of
open network connections. The shell stopped working. bash could not
execute commands, it reported messages like

-bash: start_pipeline: pgrp pipe: Too many open files in system
-bash: /bin/ls: Too many open files in system

After stopping the bad client and 10 minutes later, after GC'ing and
possibly working down the pile of Java exceptions, like

[2011-04-21 15:40:54,257][WARN ]
[netty.channel.socket.nio.NioServerSocketPipelineSink] Failed to
accept a connection.
java.io.IOException: Zu viele offene Dateien im System
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:
152)
at
org.elasticsearch.common.netty.channel.socket.nio.NioServerSocketPipelineSink
$Boss.run(NioServerSocketPipelineSink.java:244)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:
108)
at
org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

("Zu viele offene Dateien im System" = too many open files)

the system becomes responsive again and the "too many files" message
disappears.

Because I like a fool-proof multi-tenancy setup with public access for
remote indexing/search via a reverse proxy, I am very interested in
methods how to prevent an ElasticSearch cluster being flooded via the
(bulk) index API by TransportClients.

You might like calling this a feature request for QoS in ElasticSearch
(bulk) indexing.

Can someone give me a hint how to realize this feature? Thank you in
advance for your kind help!

Best Regards,

Jörg


(David Williams) #2

The reverse proxy could limit the size of the uploads & the number of
concurrent requests per IP to some reasonable level.

But it's not just the bulk indexing you'd need to worry about, there
are different kinds of searches you'd have to worry about too (sorting
large numbers of results being the most obvious one). It's a difficult
for ElasticSearch to determine reasonable values any of these without
adding lots and lots of complexity to it. So your best bet in my
opinion would be to determine what would are reasonable limits for
your use case, and have the proxy enforce those limits. Fool-proof
multi-tenancy with public access is going to require you to write
intelligence into the proxy anyway to enforce tenant isolation &
security. Extending it to add per user resource quotas is only a
little bit harder.

-david

On Thu, Apr 21, 2011 at 7:16 AM, jprante joergprante@gmail.com wrote:

Hi,

how can I prevent bad clients from flooding an Elasticsearch cluster
especiall when using bulk indexing?

Imagine remote indexing by a TransportClient with bad habits, i.e. it
ignores the messages in ActionListener and continues to
submit bulk index requests.

I tried exercising a cluster that way and the RHEL6 server of the
Elasticsearch master node at some time will sooner or later start to
drown. Even with more than 60.000 max files setting, the JDK will
start to report too many open files, obviously because of the pile of
open network connections. The shell stopped working. bash could not
execute commands, it reported messages like

-bash: start_pipeline: pgrp pipe: Too many open files in system
-bash: /bin/ls: Too many open files in system

After stopping the bad client and 10 minutes later, after GC'ing and
possibly working down the pile of Java exceptions, like

[2011-04-21 15:40:54,257][WARN ]
[netty.channel.socket.nio.NioServerSocketPipelineSink] Failed to
accept a connection.
java.io.IOException: Zu viele offene Dateien im System
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:
152)
at
org.elasticsearch.common.netty.channel.socket.nio.NioServerSocketPipelineSink
$Boss.run(NioServerSocketPipelineSink.java:244)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:
108)
at
org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

("Zu viele offene Dateien im System" = too many open files)

the system becomes responsive again and the "too many files" message
disappears.

Because I like a fool-proof multi-tenancy setup with public access for
remote indexing/search via a reverse proxy, I am very interested in
methods how to prevent an ElasticSearch cluster being flooded via the
(bulk) index API by TransportClients.

You might like calling this a feature request for QoS in ElasticSearch
(bulk) indexing.

Can someone give me a hint how to realize this feature? Thank you in
advance for your kind help!

Best Regards,

Jörg


(Shay Banon) #3

Yes, there is no good logic to throttle (bulk) indexing operation and you can get into problems.

There has been work done in 0.15 with different thread pools, so, one can configure the indexing thread pool to have a bounded size, and have it blocking (with a timeout). This is not a perfect solution, but can help. (all the TP stuff is not really documented, pretty advance stuff). But, I think the best place to put it is, as suggested, on the proxy side.

By the way, are you reusing the same TransportClient across requests. If you do, then new connections will not be created.

-shay.banon
On Thursday, April 21, 2011 at 8:32 PM, David Williams wrote:
The reverse proxy could limit the size of the uploads & the number of

concurrent requests per IP to some reasonable level.

But it's not just the bulk indexing you'd need to worry about, there
are different kinds of searches you'd have to worry about too (sorting
large numbers of results being the most obvious one). It's a difficult
for ElasticSearch to determine reasonable values any of these without
adding lots and lots of complexity to it. So your best bet in my
opinion would be to determine what would are reasonable limits for
your use case, and have the proxy enforce those limits. Fool-proof
multi-tenancy with public access is going to require you to write
intelligence into the proxy anyway to enforce tenant isolation &
security. Extending it to add per user resource quotas is only a
little bit harder.

-david

On Thu, Apr 21, 2011 at 7:16 AM, jprante joergprante@gmail.com wrote:

Hi,

how can I prevent bad clients from flooding an Elasticsearch cluster
especiall when using bulk indexing?

Imagine remote indexing by a TransportClient with bad habits, i.e. it
ignores the messages in ActionListener and continues to
submit bulk index requests.

I tried exercising a cluster that way and the RHEL6 server of the
Elasticsearch master node at some time will sooner or later start to
drown. Even with more than 60.000 max files setting, the JDK will
start to report too many open files, obviously because of the pile of
open network connections. The shell stopped working. bash could not
execute commands, it reported messages like

-bash: start_pipeline: pgrp pipe: Too many open files in system
-bash: /bin/ls: Too many open files in system

After stopping the bad client and 10 minutes later, after GC'ing and
possibly working down the pile of Java exceptions, like

[2011-04-21 15:40:54,257][WARN ]
[netty.channel.socket.nio.NioServerSocketPipelineSink] Failed to
accept a connection.
java.io.IOException: Zu viele offene Dateien im System
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:
152)
at
org.elasticsearch.common.netty.channel.socket.nio.NioServerSocketPipelineSink
$Boss.run(NioServerSocketPipelineSink.java:244)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:
108)
at
org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

("Zu viele offene Dateien im System" = too many open files)

the system becomes responsive again and the "too many files" message
disappears.

Because I like a fool-proof multi-tenancy setup with public access for
remote indexing/search via a reverse proxy, I am very interested in
methods how to prevent an ElasticSearch cluster being flooded via the
(bulk) index API by TransportClients.

You might like calling this a feature request for QoS in ElasticSearch
(bulk) indexing.

Can someone give me a hint how to realize this feature? Thank you in
advance for your kind help!

Best Regards,

Jörg


(Jörg Prante) #4

I am aware of Apache mod_qos but I'm rather anxious about using it in
ES indexing. I do not assume that shaping traffic on port 9300 between
client and server gives unpredictable behavior, it's just the amount
of extra work beside ES programming.

My use case is highly-structured data with well-defined
characteristics, a lot of fields, short values, but only a few
reasonable types of boolean queries. For query control, I have some
methods prepared, one is query language translation, i.e. mapping from
a simple query language domain into an ES DSL query, so I am able to
add constraints to ES requests easily.

True, there will be lagging problems when sorting large result sets. I
know this will also overwhelm the ES cluster under certain situations.
There is no solution I know for this class of problems, so a reverse
proxy will have to know about some 'evil queries' and will have to cut
off 'evil' sort requests in an ad-hoc manner. Other candidates for
being evil queries are some weird wildcard searches.

Full tenant isolation requires authentication beside quota. And I need
to provide "search views" via reverse proxy. A "search view" provides
styles for viewing same ES documents different ways, for only a subset
of fields, XML representation via Atom feeds etc. but that's a
different topic beside QoS.

Jörg

On Apr 21, 7:32 pm, David Williams williams.da...@gmail.com wrote:

The reverse proxy could limit the size of the uploads & the number of
concurrent requests per IP to some reasonable level.

But it's not just the bulk indexing you'd need to worry about, there
are different kinds of searches you'd have to worry about too (sorting
large numbers of results being the most obvious one). It's a difficult
for ElasticSearch to determine reasonable values any of these without
adding lots and lots of complexity to it. So your best bet in my
opinion would be to determine what would are reasonable limits for
your use case, and have the proxy enforce those limits. Fool-proof
multi-tenancy with public access is going to require you to write
intelligence into the proxy anyway to enforce tenant isolation &
security. Extending it to add per user resource quotas is only a
little bit harder.

-david

On Thu, Apr 21, 2011 at 7:16 AM, jprante joergpra...@gmail.com wrote:

Hi,

how can I prevent bad clients from flooding an Elasticsearch cluster
especiall when using bulk indexing?

Imagine remote indexing by a TransportClient with bad habits, i.e. it
ignores the messages in ActionListener and continues to
submit bulk index requests.

I tried exercising a cluster that way and the RHEL6 server of the
Elasticsearch master node at some time will sooner or later start to
drown. Even with more than 60.000 max files setting, the JDK will
start to report too many open files, obviously because of the pile of
open network connections. The shell stopped working. bash could not
execute commands, it reported messages like

-bash: start_pipeline: pgrp pipe: Too many open files in system
-bash: /bin/ls: Too many open files in system

After stopping the bad client and 10 minutes later, after GC'ing and
possibly working down the pile of Java exceptions, like

[2011-04-21 15:40:54,257][WARN ]
[netty.channel.socket.nio.NioServerSocketPipelineSink] Failed to
accept a connection.
java.io.IOException: Zu viele offene Dateien im System
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:
152)
at
org.elasticsearch.common.netty.channel.socket.nio.NioServerSocketPipelineSi nk
$Boss.run(NioServerSocketPipelineSink.java:244)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenami ngRunnable.java:
108)
at
org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

("Zu viele offene Dateien im System" = too many open files)

the system becomes responsive again and the "too many files" message
disappears.

Because I like a fool-proof multi-tenancy setup with public access for
remote indexing/search via a reverse proxy, I am very interested in
methods how to prevent an ElasticSearch cluster being flooded via the
(bulk) index API by TransportClients.

You might like calling this a feature request for QoS in ElasticSearch
(bulk) indexing.

Can someone give me a hint how to realize this feature? Thank you in
advance for your kind help!

Best Regards,

Jörg


(Jörg Prante) #5

The TransportClients are not being closed and opened for each request,
just once opened at the beginning.

In my setup I used 12 bzip2-compressed data files (containers for a
total of ~15 millions docs with each having a size of ~3K) with 12
TransportClients in parallel threads on a client machine. The client
machine was detached from the ES cluster because the task of
transforming the raw docs before getting them into ES XContent is
nontrivial and CPU-intensive. Anyway, I could push 6 MB/sec over the
wire for about an hour to a 3-node ES cluster. Each bulk request had
300 docs.

The bulk indexing variant I selected was intentionally brute force, it
did not care about the number of currently active bulk requests, so
the system drowned. It's not a big challenge to find a suitable bulk
request limit for a well-behaving client, when you can control both
server and client sides.

How about the idea of putting a kind of a simple rate limit control
into bulkrequest.execute(), hindering a remote client from executing
too much 'client bulk volume' in a too short period of time?

An upper rate limit could be given in the ES configuration, and if
missing, the 'client bulk volume' rate is unlimited. I think this
could help a little bit in dealing with bad habit remote clients.

Jörg

On Apr 21, 7:50 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

Yes, there is no good logic to throttle (bulk) indexing operation and you can get into problems.

There has been work done in 0.15 with different thread pools, so, one can configure the indexing thread pool to have a bounded size, and have it blocking (with a timeout). This is not a perfect solution, but can help. (all the TP stuff is not really documented, pretty advance stuff). But, I think the best place to put it is, as suggested, on the proxy side.

By the way, are you reusing the same TransportClient across requests. If you do, then new connections will not be created.

-shay.banonOn Thursday, April 21, 2011 at 8:32 PM, David Williams wrote:

The reverse proxy could limit the size of the uploads & the number of

concurrent requests per IP to some reasonable level.

But it's not just the bulk indexing you'd need to worry about, there
are different kinds of searches you'd have to worry about too (sorting
large numbers of results being the most obvious one). It's a difficult
for ElasticSearch to determine reasonable values any of these without
adding lots and lots of complexity to it. So your best bet in my
opinion would be to determine what would are reasonable limits for
your use case, and have the proxy enforce those limits. Fool-proof
multi-tenancy with public access is going to require you to write
intelligence into the proxy anyway to enforce tenant isolation &
security. Extending it to add per user resource quotas is only a
little bit harder.

-david

On Thu, Apr 21, 2011 at 7:16 AM, jprante joergpra...@gmail.com wrote:

Hi,

how can I prevent bad clients from flooding an Elasticsearch cluster
especiall when using bulk indexing?

Imagine remote indexing by a TransportClient with bad habits, i.e. it
ignores the messages in ActionListener and continues to
submit bulk index requests.

I tried exercising a cluster that way and the RHEL6 server of the
Elasticsearch master node at some time will sooner or later start to
drown. Even with more than 60.000 max files setting, the JDK will
start to report too many open files, obviously because of the pile of
open network connections. The shell stopped working. bash could not
execute commands, it reported messages like

-bash: start_pipeline: pgrp pipe: Too many open files in system
-bash: /bin/ls: Too many open files in system

After stopping the bad client and 10 minutes later, after GC'ing and
possibly working down the pile of Java exceptions, like

[2011-04-21 15:40:54,257][WARN ]
[netty.channel.socket.nio.NioServerSocketPipelineSink] Failed to
accept a connection.
java.io.IOException: Zu viele offene Dateien im System
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:
152)
at
org.elasticsearch.common.netty.channel.socket.nio.NioServerSocketPipelineSi nk
$Boss.run(NioServerSocketPipelineSink.java:244)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenami ngRunnable.java:
108)
at
org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

("Zu viele offene Dateien im System" = too many open files)

the system becomes responsive again and the "too many files" message
disappears.

Because I like a fool-proof multi-tenancy setup with public access for
remote indexing/search via a reverse proxy, I am very interested in
methods how to prevent an ElasticSearch cluster being flooded via the
(bulk) index API by TransportClients.

You might like calling this a feature request for QoS in ElasticSearch
(bulk) indexing.

Can someone give me a hint how to realize this feature? Thank you in
advance for your kind help!

Best Regards,

Jörg


(Shay Banon) #6

The TransportClient is thread safe, so no need to create one per thread (if I understood correctly, you might already be using one per thread).

Having a throttling support on the client side is nice, but does not solve the real problem, since you might have many clients indexing data, and you need to take that into account. More specifically, we can have a throttling value per node on the number of concurrent bulk requests executed, but then, it needs to bubble up all the way to the communication layer to stop accepting new bulk requests (otherwise we will get a very large queue of bulk requests pending). This is what the indexing thread pool does (assuming its configured to be blocking).
On Friday, April 22, 2011 at 12:51 AM, jprante wrote:

The TransportClients are not being closed and opened for each request,
just once opened at the beginning.

In my setup I used 12 bzip2-compressed data files (containers for a
total of ~15 millions docs with each having a size of ~3K) with 12
TransportClients in parallel threads on a client machine. The client
machine was detached from the ES cluster because the task of
transforming the raw docs before getting them into ES XContent is
nontrivial and CPU-intensive. Anyway, I could push 6 MB/sec over the
wire for about an hour to a 3-node ES cluster. Each bulk request had
300 docs.

The bulk indexing variant I selected was intentionally brute force, it
did not care about the number of currently active bulk requests, so
the system drowned. It's not a big challenge to find a suitable bulk
request limit for a well-behaving client, when you can control both
server and client sides.

How about the idea of putting a kind of a simple rate limit control
into bulkrequest.execute(), hindering a remote client from executing
too much 'client bulk volume' in a too short period of time?

An upper rate limit could be given in the ES configuration, and if
missing, the 'client bulk volume' rate is unlimited. I think this
could help a little bit in dealing with bad habit remote clients.

Jörg

On Apr 21, 7:50 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

Yes, there is no good logic to throttle (bulk) indexing operation and you can get into problems.

There has been work done in 0.15 with different thread pools, so, one can configure the indexing thread pool to have a bounded size, and have it blocking (with a timeout). This is not a perfect solution, but can help. (all the TP stuff is not really documented, pretty advance stuff). But, I think the best place to put it is, as suggested, on the proxy side.

By the way, are you reusing the same TransportClient across requests. If you do, then new connections will not be created.

-shay.banonOn Thursday, April 21, 2011 at 8:32 PM, David Williams wrote:

The reverse proxy could limit the size of the uploads & the number of

concurrent requests per IP to some reasonable level.

But it's not just the bulk indexing you'd need to worry about, there
are different kinds of searches you'd have to worry about too (sorting
large numbers of results being the most obvious one). It's a difficult
for ElasticSearch to determine reasonable values any of these without
adding lots and lots of complexity to it. So your best bet in my
opinion would be to determine what would are reasonable limits for
your use case, and have the proxy enforce those limits. Fool-proof
multi-tenancy with public access is going to require you to write
intelligence into the proxy anyway to enforce tenant isolation &
security. Extending it to add per user resource quotas is only a
little bit harder.

-david

On Thu, Apr 21, 2011 at 7:16 AM, jprante joergpra...@gmail.com wrote:

Hi,

how can I prevent bad clients from flooding an Elasticsearch cluster
especiall when using bulk indexing?

Imagine remote indexing by a TransportClient with bad habits, i.e. it
ignores the messages in ActionListener and continues to
submit bulk index requests.

I tried exercising a cluster that way and the RHEL6 server of the
Elasticsearch master node at some time will sooner or later start to
drown. Even with more than 60.000 max files setting, the JDK will
start to report too many open files, obviously because of the pile of
open network connections. The shell stopped working. bash could not
execute commands, it reported messages like

-bash: start_pipeline: pgrp pipe: Too many open files in system
-bash: /bin/ls: Too many open files in system

After stopping the bad client and 10 minutes later, after GC'ing and
possibly working down the pile of Java exceptions, like

[2011-04-21 15:40:54,257][WARN ]
[netty.channel.socket.nio.NioServerSocketPipelineSink] Failed to
accept a connection.
java.io.IOException: Zu viele offene Dateien im System
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:
152)
at
org.elasticsearch.common.netty.channel.socket.nio.NioServerSocketPipelineSi nk
$Boss.run(NioServerSocketPipelineSink.java:244)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenami ngRunnable.java:
108)
at
org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

("Zu viele offene Dateien im System" = too many open files)

the system becomes responsive again and the "too many files" message
disappears.

Because I like a fool-proof multi-tenancy setup with public access for
remote indexing/search via a reverse proxy, I am very interested in
methods how to prevent an ElasticSearch cluster being flooded via the
(bulk) index API by TransportClients.

You might like calling this a feature request for QoS in ElasticSearch
(bulk) indexing.

Can someone give me a hint how to realize this feature? Thank you in
advance for your kind help!

Best Regards,

Jörg


(Jörg Prante) #7

I admit I do not completely understand the communication layer thing
and the need of bubbling up information. The indexing thread pool
could be a little bit late in the show to handle critical situations
when servers are going to be overwhelmed. Maybe TransportClient need
some rejection actions or emergency disconnects (HTTP "503 Service
unavailable" or equivalent IOExceptions) to avoid flooding. The closer
to the TCP/IP layer the better because this should drop file handles.

Well, I will study if it is possible to extend TransportStats,
TransportService, TransportClient and friends with some throughput
stats (volume rates) for the last seconds, minutes, etc. Maybe more
TransportClient accounting is needed, maybe per-node throughput stats
instead of per-client is just feasible. That way I hope to get some
more insight into what is really going on when TransportClients with
lot of bulk requests are connected.

It should be fairly easy predictable that if throughput stats are
going through the roof, the handleRequest() method in RestBulkAction
(or even a method in the chain before) should provide HTTP 503
rejections before delving into bulkRequest.add() invocations. Maybe
per client, maybe for all kind of bulk requests when rate limits
exceed.

I do not dare promising a patch, maybe my idea with extending the
stats is going in the wrong direction.

Jörg

On Apr 21, 11:55 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

The TransportClient is thread safe, so no need to create one per thread (if I understood correctly, you might already be using one per thread).

Having a throttling support on the client side is nice, but does not solve the real problem, since you might have many clients indexing data, and you need to take that into account. More specifically, we can have a throttling value per node on the number of concurrent bulk requests executed, but then, it needs to bubble up all the way to the communication layer to stop accepting new bulk requests (otherwise we will get a very large queue of bulk requests pending). This is what the indexing thread pool does (assuming its configured to be blocking).

On Friday, April 22, 2011 at 12:51 AM, jprante wrote:

The TransportClients are not being closed and opened for each request,
just once opened at the beginning.

In my setup I used 12 bzip2-compressed data files (containers for a
total of ~15 millions docs with each having a size of ~3K) with 12
TransportClients in parallel threads on a client machine. The client
machine was detached from the ES cluster because the task of
transforming the raw docs before getting them into ES XContent is
nontrivial and CPU-intensive. Anyway, I could push 6 MB/sec over the
wire for about an hour to a 3-node ES cluster. Each bulk request had
300 docs.

The bulk indexing variant I selected was intentionally brute force, it
did not care about the number of currently active bulk requests, so
the system drowned. It's not a big challenge to find a suitable bulk
request limit for a well-behaving client, when you can control both
server and client sides.

How about the idea of putting a kind of a simple rate limit control
into bulkrequest.execute(), hindering a remote client from executing
too much 'client bulk volume' in a too short period of time?

An upper rate limit could be given in the ES configuration, and if
missing, the 'client bulk volume' rate is unlimited. I think this
could help a little bit in dealing with bad habit remote clients.

Jörg

On Apr 21, 7:50 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

Yes, there is no good logic to throttle (bulk) indexing operation and you can get into problems.

There has been work done in 0.15 with different thread pools, so, one can configure the indexing thread pool to have a bounded size, and have it blocking (with a timeout). This is not a perfect solution, but can help. (all the TP stuff is not really documented, pretty advance stuff). But, I think the best place to put it is, as suggested, on the proxy side.

By the way, are you reusing the same TransportClient across requests. If you do, then new connections will not be created.

-shay.banonOn Thursday, April 21, 2011 at 8:32 PM, David Williams wrote:

The reverse proxy could limit the size of the uploads & the number of

concurrent requests per IP to some reasonable level.

But it's not just the bulk indexing you'd need to worry about, there
are different kinds of searches you'd have to worry about too (sorting
large numbers of results being the most obvious one). It's a difficult
for ElasticSearch to determine reasonable values any of these without
adding lots and lots of complexity to it. So your best bet in my
opinion would be to determine what would are reasonable limits for
your use case, and have the proxy enforce those limits. Fool-proof
multi-tenancy with public access is going to require you to write
intelligence into the proxy anyway to enforce tenant isolation &
security. Extending it to add per user resource quotas is only a
little bit harder.

-david

On Thu, Apr 21, 2011 at 7:16 AM, jprante joergpra...@gmail.com wrote:

Hi,

how can I prevent bad clients from flooding an Elasticsearch cluster
especiall when using bulk indexing?

Imagine remote indexing by a TransportClient with bad habits, i.e. it
ignores the messages in ActionListener and continues to
submit bulk index requests.

I tried exercising a cluster that way and the RHEL6 server of the
Elasticsearch master node at some time will sooner or later start to
drown. Even with more than 60.000 max files setting, the JDK will
start to report too many open files, obviously because of the pile of
open network connections. The shell stopped working. bash could not
execute commands, it reported messages like

-bash: start_pipeline: pgrp pipe: Too many open files in system
-bash: /bin/ls: Too many open files in system

After stopping the bad client and 10 minutes later, after GC'ing and
possibly working down the pile of Java exceptions, like

[2011-04-21 15:40:54,257][WARN ]
[netty.channel.socket.nio.NioServerSocketPipelineSink] Failed to
accept a connection.
java.io.IOException: Zu viele offene Dateien im System
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:
152)
at
org.elasticsearch.common.netty.channel.socket.nio.NioServerSocketPipelineSi nk
$Boss.run(NioServerSocketPipelineSink.java:244)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenami ngRunnable.java:
108)
at
org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

("Zu viele offene Dateien im System" = too many open files)

the system becomes responsive again and the "too many files" message
disappears.

Because I like a fool-proof multi-tenancy setup with public access for
remote indexing/search via a reverse proxy, I am very interested in
methods how to prevent an ElasticSearch cluster being flooded via the
(bulk) index API by TransportClients.

You might like calling this a feature request for QoS in ElasticSearch
(bulk) indexing.

Can someone give me a hint how to realize this feature? Thank you in
advance for your kind help!

Best Regards,

Jörg


(Shay Banon) #8

The TransportClient is the wrong place to do it, since you might have many of those. The reason why the thread pool will be the better place to have it is because a blocking thread pool will block the IO thread from reading more data if overflowed. The number of connection remains the same if you use the same TransportClient.
On Saturday, April 23, 2011 at 10:42 AM, jprante wrote:

I admit I do not completely understand the communication layer thing
and the need of bubbling up information. The indexing thread pool
could be a little bit late in the show to handle critical situations
when servers are going to be overwhelmed. Maybe TransportClient need
some rejection actions or emergency disconnects (HTTP "503 Service
unavailable" or equivalent IOExceptions) to avoid flooding. The closer
to the TCP/IP layer the better because this should drop file handles.

Well, I will study if it is possible to extend TransportStats,
TransportService, TransportClient and friends with some throughput
stats (volume rates) for the last seconds, minutes, etc. Maybe more
TransportClient accounting is needed, maybe per-node throughput stats
instead of per-client is just feasible. That way I hope to get some
more insight into what is really going on when TransportClients with
lot of bulk requests are connected.

It should be fairly easy predictable that if throughput stats are
going through the roof, the handleRequest() method in RestBulkAction
(or even a method in the chain before) should provide HTTP 503
rejections before delving into bulkRequest.add() invocations. Maybe
per client, maybe for all kind of bulk requests when rate limits
exceed.

I do not dare promising a patch, maybe my idea with extending the
stats is going in the wrong direction.

Jörg

On Apr 21, 11:55 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

The TransportClient is thread safe, so no need to create one per thread (if I understood correctly, you might already be using one per thread).

Having a throttling support on the client side is nice, but does not solve the real problem, since you might have many clients indexing data, and you need to take that into account. More specifically, we can have a throttling value per node on the number of concurrent bulk requests executed, but then, it needs to bubble up all the way to the communication layer to stop accepting new bulk requests (otherwise we will get a very large queue of bulk requests pending). This is what the indexing thread pool does (assuming its configured to be blocking).

On Friday, April 22, 2011 at 12:51 AM, jprante wrote:

The TransportClients are not being closed and opened for each request,
just once opened at the beginning.

In my setup I used 12 bzip2-compressed data files (containers for a
total of ~15 millions docs with each having a size of ~3K) with 12
TransportClients in parallel threads on a client machine. The client
machine was detached from the ES cluster because the task of
transforming the raw docs before getting them into ES XContent is
nontrivial and CPU-intensive. Anyway, I could push 6 MB/sec over the
wire for about an hour to a 3-node ES cluster. Each bulk request had
300 docs.

The bulk indexing variant I selected was intentionally brute force, it
did not care about the number of currently active bulk requests, so
the system drowned. It's not a big challenge to find a suitable bulk
request limit for a well-behaving client, when you can control both
server and client sides.

How about the idea of putting a kind of a simple rate limit control
into bulkrequest.execute(), hindering a remote client from executing
too much 'client bulk volume' in a too short period of time?

An upper rate limit could be given in the ES configuration, and if
missing, the 'client bulk volume' rate is unlimited. I think this
could help a little bit in dealing with bad habit remote clients.

Jörg

On Apr 21, 7:50 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

Yes, there is no good logic to throttle (bulk) indexing operation and you can get into problems.

There has been work done in 0.15 with different thread pools, so, one can configure the indexing thread pool to have a bounded size, and have it blocking (with a timeout). This is not a perfect solution, but can help. (all the TP stuff is not really documented, pretty advance stuff). But, I think the best place to put it is, as suggested, on the proxy side.

By the way, are you reusing the same TransportClient across requests. If you do, then new connections will not be created.

-shay.banonOn Thursday, April 21, 2011 at 8:32 PM, David Williams wrote:

The reverse proxy could limit the size of the uploads & the number of

concurrent requests per IP to some reasonable level.

But it's not just the bulk indexing you'd need to worry about, there
are different kinds of searches you'd have to worry about too (sorting
large numbers of results being the most obvious one). It's a difficult
for ElasticSearch to determine reasonable values any of these without
adding lots and lots of complexity to it. So your best bet in my
opinion would be to determine what would are reasonable limits for
your use case, and have the proxy enforce those limits. Fool-proof
multi-tenancy with public access is going to require you to write
intelligence into the proxy anyway to enforce tenant isolation &
security. Extending it to add per user resource quotas is only a
little bit harder.

-david

On Thu, Apr 21, 2011 at 7:16 AM, jprante joergpra...@gmail.com wrote:

Hi,

how can I prevent bad clients from flooding an Elasticsearch cluster
especiall when using bulk indexing?

Imagine remote indexing by a TransportClient with bad habits, i.e. it
ignores the messages in ActionListener and continues to
submit bulk index requests.

I tried exercising a cluster that way and the RHEL6 server of the
Elasticsearch master node at some time will sooner or later start to
drown. Even with more than 60.000 max files setting, the JDK will
start to report too many open files, obviously because of the pile of
open network connections. The shell stopped working. bash could not
execute commands, it reported messages like

-bash: start_pipeline: pgrp pipe: Too many open files in system
-bash: /bin/ls: Too many open files in system

After stopping the bad client and 10 minutes later, after GC'ing and
possibly working down the pile of Java exceptions, like

[2011-04-21 15:40:54,257][WARN ]
[netty.channel.socket.nio.NioServerSocketPipelineSink] Failed to
accept a connection.
java.io.IOException: Zu viele offene Dateien im System
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:
152)
at
org.elasticsearch.common.netty.channel.socket.nio.NioServerSocketPipelineSi nk
$Boss.run(NioServerSocketPipelineSink.java:244)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenami ngRunnable.java:
108)
at
org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

("Zu viele offene Dateien im System" = too many open files)

the system becomes responsive again and the "too many files" message
disappears.

Because I like a fool-proof multi-tenancy setup with public access for
remote indexing/search via a reverse proxy, I am very interested in
methods how to prevent an ElasticSearch cluster being flooded via the
(bulk) index API by TransportClients.

You might like calling this a feature request for QoS in ElasticSearch
(bulk) indexing.

Can someone give me a hint how to realize this feature? Thank you in
advance for your kind help!

Best Regards,

Jörg


(system) #9