BulkResponse throws EOFException


(jason) #1

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

                LinkedList<XContentBuilder> queue = .....//assume

the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType())
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet();  //throws

exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStreamInput.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationRequest.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(MessageChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:
302)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:
317)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:
299)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:
216)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChannelsHandler.java:
51)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:
545)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:
540)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:
274)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:
261)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:
349)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:
280)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:
200)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:
108)
at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

If you have encountered such an error in the past, could you please
suggest what the most probable cause of it?

Thank you,
Eugene.


(Shay Banon) #2

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.
On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType())
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStreamInput.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationRequest.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(MessageChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:
302)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:
317)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:
299)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:
216)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChannelsHandler.java:
51)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:
545)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:
540)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:
274)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:
261)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:
349)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:
280)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:
200)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:
108)
at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

If you have encountered such an error in the past, could you please
suggest what the most probable cause of it?

Thank you,
Eugene.


(jason) #3

I tried to add requests to BuildRequestBuilder but the exception is
still not going away. My program is multithreaded, where first thread
adds objects to to the queue, and another thread retrieves those
objects from the queue and indexes them as bulk. The json objects can
be thought of IndexRequest objects. What do you think will be the
best way to solve this problem? It is really not complex, but
BulkResponse behaves strangely for me.

On Apr 12, 11:55 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.

On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType(­))
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.­java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStream­Input.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationReque­st.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(Messa­geChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(Mes­sageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­s.java:
302)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFi­reMessageReceived(FrameDecoder.java:
317)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(­FrameDecoder.java:
299)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageRece­ived(FrameDecoder.java:
216)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChann­elsHandler.java:
51)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­DefaultChannelPipeline.java:
545)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­DefaultChannelPipeline.java:
540)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­s.java:
274)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­s.java:
261)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.­java:
349)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.processSelected­Keys(NioWorker.java:
280)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.j­ava:
200)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenami­ngRunnable.java:
108)
at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

If you have encountered such an error in the past, could you please
suggest what the most probable cause of it?

Thank you,
Eugene.- Hide quoted text -

  • Show quoted text -

(Shay Banon) #4

BulkRequestBuilder is not thread safe. If you want to do that, then you can Use the XContetFactory#safeXXX methods to thread safe builders and use the original code you posted, or, do the conversion on the thread that adds elements to the bulk request.
On Wednesday, April 13, 2011 at 3:03 AM, eugene wrote:

I tried to add requests to BuildRequestBuilder but the exception is
still not going away. My program is multithreaded, where first thread
adds objects to to the queue, and another thread retrieves those
objects from the queue and indexes them as bulk. The json objects can
be thought of IndexRequest objects. What do you think will be the
best way to solve this problem? It is really not complex, but
BulkResponse behaves strangely for me.

On Apr 12, 11:55 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.

On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType(­))
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.­java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStream­Input.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationReque­st.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(Messa­geChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(Mes­sageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­s.java:
302)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFi­reMessageReceived(FrameDecoder.java:
317)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(­FrameDecoder.java:
299)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageRece­ived(FrameDecoder.java:
216)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChann­elsHandler.java:
51)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­DefaultChannelPipeline.java:
545)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­DefaultChannelPipeline.java:
540)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­s.java:
274)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­s.java:
261)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.­java:
349)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.processSelected­Keys(NioWorker.java:
280)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.j­ava:
200)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenami­ngRunnable.java:
108)
at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

If you have encountered such an error in the past, could you please
suggest what the most probable cause of it?

Thank you,
Eugene.- Hide quoted text -

  • Show quoted text -

(jason) #5

Thanks Shay, it's a good advice. However, I am still getting this
exception. How can I avoid buffer reuse, since when I aggregate
objects using BulkRequestBuilder, I am still adding new IndexRequest
objects to the builder and populate XContentBuilder.
Also, can it be happening because I am not assigning id to each
IndexRequest? I checked and the id is set to null for all of them for
some reason.

On Apr 13, 12:53 am, Shay Banon shay.ba...@elasticsearch.com wrote:

BulkRequestBuilder is not thread safe. If you want to do that, then you can Use the XContetFactory#safeXXX methods to thread safe builders and use the original code you posted, or, do the conversion on the thread that adds elements to the bulk request.

On Wednesday, April 13, 2011 at 3:03 AM, eugene wrote:

I tried to add requests to BuildRequestBuilder but the exception is
still not going away. My program is multithreaded, where first thread
adds objects to to the queue, and another thread retrieves those
objects from the queue and indexes them as bulk. The json objects can
be thought of IndexRequest objects. What do you think will be the
best way to solve this problem? It is really not complex, but
BulkResponse behaves strangely for me.

On Apr 12, 11:55 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.

On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType(­­))
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.­­java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStream­­Input.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationReque­­st.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(Messa­­geChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(Mes­­sageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­s.java:
302)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFi­­reMessageReceived(FrameDecoder.java:
317)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(­­FrameDecoder.java:
299)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageRece­­ived(FrameDecoder.java:
216)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChann­­elsHandler.java:
51)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­DefaultChannelPipeline.java:
545)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­DefaultChannelPipeline.java:
540)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­s.java:
274)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­s.java:
261)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.­­java:
349)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.processSelected­­Keys(NioWorker.java:
280)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.j­­ava:
200)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenami­­ngRunnable.java:
108)
at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

If you have encountered such an error in the past, could you please
suggest what the most probable cause of it?

Thank you,
Eugene.- Hide quoted text -

  • Show quoted text -- Hide quoted text -
  • Show quoted text -

(Shay Banon) #6

When you add an IndexRequest to the BulkRequestBuilder, it copies over the buffer. You should not see that in this case. Can you gist a sample of the code you use?
On Thursday, April 14, 2011 at 12:22 AM, eugene wrote:

Thanks Shay, it's a good advice. However, I am still getting this
exception. How can I avoid buffer reuse, since when I aggregate
objects using BulkRequestBuilder, I am still adding new IndexRequest
objects to the builder and populate XContentBuilder.
Also, can it be happening because I am not assigning id to each
IndexRequest? I checked and the id is set to null for all of them for
some reason.

On Apr 13, 12:53 am, Shay Banon shay.ba...@elasticsearch.com wrote:

BulkRequestBuilder is not thread safe. If you want to do that, then you can Use the XContetFactory#safeXXX methods to thread safe builders and use the original code you posted, or, do the conversion on the thread that adds elements to the bulk request.

On Wednesday, April 13, 2011 at 3:03 AM, eugene wrote:

I tried to add requests to BuildRequestBuilder but the exception is
still not going away. My program is multithreaded, where first thread
adds objects to to the queue, and another thread retrieves those
objects from the queue and indexes them as bulk. The json objects can
be thought of IndexRequest objects. What do you think will be the
best way to solve this problem? It is really not complex, but
BulkResponse behaves strangely for me.

On Apr 12, 11:55 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.

On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType(­­))
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.­­java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStream­­Input.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationReque­­st.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(Messa­­geChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(Mes­­sageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­s.java:
302)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFi­­reMessageReceived(FrameDecoder.java:
317)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(­­FrameDecoder.java:
299)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageRece­­ived(FrameDecoder.java:
216)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChann­­elsHandler.java:
51)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­DefaultChannelPipeline.java:
545)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­DefaultChannelPipeline.java:
540)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­s.java:
274)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­s.java:
261)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.­­java:
349)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.processSelected­­Keys(NioWorker.java:
280)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.j­­ava:
200)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenami­­ngRunnable.java:
108)
at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

If you have encountered such an error in the past, could you please
suggest what the most probable cause of it?

Thank you,
Eugene.- Hide quoted text -

  • Show quoted text -- Hide quoted text -
  • Show quoted text -

(jason) #7

Here is the gist:

This is a private gist, I didn't put the output.log file and
files.properties because they contains production data. Please let me
know if you need more information.

Thank you,
Eugene.

On Apr 13, 2:48 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

When you add an IndexRequest to the BulkRequestBuilder, it copies over the buffer. You should not see that in this case. Can you gist a sample of the code you use?

On Thursday, April 14, 2011 at 12:22 AM, eugene wrote:

Thanks Shay, it's a good advice. However, I am still getting this
exception. How can I avoid buffer reuse, since when I aggregate
objects using BulkRequestBuilder, I am still adding new IndexRequest
objects to the builder and populate XContentBuilder.
Also, can it be happening because I am not assigning id to each
IndexRequest? I checked and the id is set to null for all of them for
some reason.

On Apr 13, 12:53 am, Shay Banon shay.ba...@elasticsearch.com wrote:

BulkRequestBuilder is not thread safe. If you want to do that, then you can Use the XContetFactory#safeXXX methods to thread safe builders and use the original code you posted, or, do the conversion on the thread that adds elements to the bulk request.

On Wednesday, April 13, 2011 at 3:03 AM, eugene wrote:

I tried to add requests to BuildRequestBuilder but the exception is
still not going away. My program is multithreaded, where first thread
adds objects to to the queue, and another thread retrieves those
objects from the queue and indexes them as bulk. The json objects can
be thought of IndexRequest objects. What do you think will be the
best way to solve this problem? It is really not complex, but
BulkResponse behaves strangely for me.

On Apr 12, 11:55 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.

On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType(­­­))
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.­­­java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStream­­­Input.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationReque­­­st.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(Messa­­­geChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(Mes­­­sageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­s.java:
302)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFi­­­reMessageReceived(FrameDecoder.java:
317)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(­­­FrameDecoder.java:
299)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageRece­­­ived(FrameDecoder.java:
216)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChann­­­elsHandler.java:
51)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­DefaultChannelPipeline.java:
545)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­DefaultChannelPipeline.java:
540)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­s.java:
274)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­s.java:
261)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.­­­java:
349)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.processSelected­­­Keys(NioWorker.java:
280)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.j­­­ava:
200)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenami­­­ngRunnable.java:
108)
at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

If you have encountered such an error in the past, could you please
suggest what the most probable cause of it?

Thank you,
Eugene.- Hide quoted text -

  • Show quoted text -- Hide quoted text -
  • Show quoted text -- Hide quoted text -
  • Show quoted text -

(Shay Banon) #8

I don't understand your logic there in the LogProcessor run method. You create a single builder object, and then index it "bulkSize" times? Shouldn't you be index "bulkSize" different sources?
On Thursday, April 14, 2011 at 1:29 AM, eugene wrote:

Here is the gist:

https://gist.github.com/c1400655e0d8728cac49

This is a private gist, I didn't put the output.log file and
files.properties because they contains production data. Please let me
know if you need more information.

Thank you,
Eugene.

On Apr 13, 2:48 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

When you add an IndexRequest to the BulkRequestBuilder, it copies over the buffer. You should not see that in this case. Can you gist a sample of the code you use?

On Thursday, April 14, 2011 at 12:22 AM, eugene wrote:

Thanks Shay, it's a good advice. However, I am still getting this
exception. How can I avoid buffer reuse, since when I aggregate
objects using BulkRequestBuilder, I am still adding new IndexRequest
objects to the builder and populate XContentBuilder.
Also, can it be happening because I am not assigning id to each
IndexRequest? I checked and the id is set to null for all of them for
some reason.

On Apr 13, 12:53 am, Shay Banon shay.ba...@elasticsearch.com wrote:

BulkRequestBuilder is not thread safe. If you want to do that, then you can Use the XContetFactory#safeXXX methods to thread safe builders and use the original code you posted, or, do the conversion on the thread that adds elements to the bulk request.

On Wednesday, April 13, 2011 at 3:03 AM, eugene wrote:

I tried to add requests to BuildRequestBuilder but the exception is
still not going away. My program is multithreaded, where first thread
adds objects to to the queue, and another thread retrieves those
objects from the queue and indexes them as bulk. The json objects can
be thought of IndexRequest objects. What do you think will be the
best way to solve this problem? It is really not complex, but
BulkResponse behaves strangely for me.

On Apr 12, 11:55 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.

On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType(­­­))
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.­­­java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStream­­­Input.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationReque­­­st.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(Messa­­­geChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(Mes­­­sageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­s.java:
302)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFi­­­reMessageReceived(FrameDecoder.java:
317)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(­­­FrameDecoder.java:
299)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageRece­­­ived(FrameDecoder.java:
216)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChann­­­elsHandler.java:
51)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­DefaultChannelPipeline.java:
545)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­DefaultChannelPipeline.java:
540)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­s.java:
274)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­s.java:
261)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.­­­java:
349)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.processSelected­­­Keys(NioWorker.java:
280)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.j­­­ava:
200)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenami­­­ngRunnable.java:
108)
at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

If you have encountered such an error in the past, could you please
suggest what the most probable cause of it?

Thank you,
Eugene.- Hide quoted text -

  • Show quoted text -- Hide quoted text -
  • Show quoted text -- Hide quoted text -
  • Show quoted text -

(jason) #9

Oops, I made a mistake. I corrected it and updated the gist, and the
exception occurs.

On Apr 13, 3:46 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't understand your logic there in the LogProcessor run method. You create a single builder object, and then index it "bulkSize" times? Shouldn't you be index "bulkSize" different sources?

On Thursday, April 14, 2011 at 1:29 AM, eugene wrote:

Here is the gist:

https://gist.github.com/c1400655e0d8728cac49

This is a private gist, I didn't put the output.log file and
files.properties because they contains production data. Please let me
know if you need more information.

Thank you,
Eugene.

On Apr 13, 2:48 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

When you add an IndexRequest to the BulkRequestBuilder, it copies over the buffer. You should not see that in this case. Can you gist a sample of the code you use?

On Thursday, April 14, 2011 at 12:22 AM, eugene wrote:

Thanks Shay, it's a good advice. However, I am still getting this
exception. How can I avoid buffer reuse, since when I aggregate
objects using BulkRequestBuilder, I am still adding new IndexRequest
objects to the builder and populate XContentBuilder.
Also, can it be happening because I am not assigning id to each
IndexRequest? I checked and the id is set to null for all of them for
some reason.

On Apr 13, 12:53 am, Shay Banon shay.ba...@elasticsearch.com wrote:

BulkRequestBuilder is not thread safe. If you want to do that, then you can Use the XContetFactory#safeXXX methods to thread safe builders and use the original code you posted, or, do the conversion on the thread that adds elements to the bulk request.

On Wednesday, April 13, 2011 at 3:03 AM, eugene wrote:

I tried to add requests to BuildRequestBuilder but the exception is
still not going away. My program is multithreaded, where first thread
adds objects to to the queue, and another thread retrieves those
objects from the queue and indexes them as bulk. The json objects can
be thought of IndexRequest objects. What do you think will be the
best way to solve this problem? It is really not complex, but
BulkResponse behaves strangely for me.

On Apr 12, 11:55 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.

On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType(­­­­))
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.­­­­java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStream­­­­Input.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationReque­­­­st.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(Messa­­­­geChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(Mes­­­­sageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­s.java:
302)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFi­­­­reMessageReceived(FrameDecoder.java:
317)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(­­­­FrameDecoder.java:
299)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageRece­­­­ived(FrameDecoder.java:
216)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChann­­­­elsHandler.java:
51)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­DefaultChannelPipeline.java:
545)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­DefaultChannelPipeline.java:
540)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­s.java:
274)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­s.java:
261)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.­­­­java:
349)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.processSelected­­­­Keys(NioWorker.java:
280)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.j­­­­ava:
200)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenami­­­­ngRunnable.java:
108)
at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

If you have encountered such an error in the past, could you please
suggest what the most probable cause of it?

Thank you,
Eugene.- Hide quoted text -

  • Show quoted text -- Hide quoted text -
  • Show quoted text -- Hide quoted text -
  • Show quoted text -- Hide quoted text -
  • Show quoted text -

(jason) #10

Oops, I made a mistake. I fixed it and updated the gist, which still
didn't eliminate the exception but the logic should be correct now.

On Apr 13, 3:46 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't understand your logic there in the LogProcessor run method. You create a single builder object, and then index it "bulkSize" times? Shouldn't you be index "bulkSize" different sources?

On Thursday, April 14, 2011 at 1:29 AM, eugene wrote:

Here is the gist:

https://gist.github.com/c1400655e0d8728cac49

This is a private gist, I didn't put the output.log file and
files.properties because they contains production data. Please let me
know if you need more information.

Thank you,
Eugene.

On Apr 13, 2:48 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

When you add an IndexRequest to the BulkRequestBuilder, it copies over the buffer. You should not see that in this case. Can you gist a sample of the code you use?

On Thursday, April 14, 2011 at 12:22 AM, eugene wrote:

Thanks Shay, it's a good advice. However, I am still getting this
exception. How can I avoid buffer reuse, since when I aggregate
objects using BulkRequestBuilder, I am still adding new IndexRequest
objects to the builder and populate XContentBuilder.
Also, can it be happening because I am not assigning id to each
IndexRequest? I checked and the id is set to null for all of them for
some reason.

On Apr 13, 12:53 am, Shay Banon shay.ba...@elasticsearch.com wrote:

BulkRequestBuilder is not thread safe. If you want to do that, then you can Use the XContetFactory#safeXXX methods to thread safe builders and use the original code you posted, or, do the conversion on the thread that adds elements to the bulk request.

On Wednesday, April 13, 2011 at 3:03 AM, eugene wrote:

I tried to add requests to BuildRequestBuilder but the exception is
still not going away. My program is multithreaded, where first thread
adds objects to to the queue, and another thread retrieves those
objects from the queue and indexes them as bulk. The json objects can
be thought of IndexRequest objects. What do you think will be the
best way to solve this problem? It is really not complex, but
BulkResponse behaves strangely for me.

On Apr 12, 11:55 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.

On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType(­­­­))
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.­­­­java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStream­­­­Input.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationReque­­­­st.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(Messa­­­­geChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(Mes­­­­sageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­s.java:
302)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFi­­­­reMessageReceived(FrameDecoder.java:
317)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(­­­­FrameDecoder.java:
299)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageRece­­­­ived(FrameDecoder.java:
216)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChann­­­­elsHandler.java:
51)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­DefaultChannelPipeline.java:
545)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­DefaultChannelPipeline.java:
540)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­s.java:
274)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­s.java:
261)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.­­­­java:
349)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.processSelected­­­­Keys(NioWorker.java:
280)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.j­­­­ava:
200)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenami­­­­ngRunnable.java:
108)
at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

If you have encountered such an error in the past, could you please
suggest what the most probable cause of it?

Thank you,
Eugene.- Hide quoted text -

  • Show quoted text -- Hide quoted text -
  • Show quoted text -- Hide quoted text -
  • Show quoted text -- Hide quoted text -
  • Show quoted text -

(Shay Banon) #11

Also, your code is not thread safe. You are using a LinkedList and access it from different threads without proper sync. Even if you sync on the methods of ES handler it won't help.

What you want to do is have a LinkedBlockingQueue as the message passing queue (better one to use is LinkedTransferQueue, but thats in Java7, though ES comes with one, so you can use it). Have the trailer push data into the queue, and have the processor poll for data. Once it gets a new element, add it to the BulkRequestBuilder. Then, using the BulkREquestBuilder#numberOfActions you can "flush" that bulk once it passes a specified bulk size, reset to a new BulkRequestBuilder, and continue from there.

I should probably write a helper (thread safe) BulkProcessor class that does that behind the scenes, and possibly be able to allow to control the number of concurrent bulk requests allowed, bulk size / actions, and so on, so things will be simpler.
On Thursday, April 14, 2011 at 1:50 AM, eugene wrote:

Oops, I made a mistake. I corrected it and updated the gist, and the
exception occurs.

On Apr 13, 3:46 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't understand your logic there in the LogProcessor run method. You create a single builder object, and then index it "bulkSize" times? Shouldn't you be index "bulkSize" different sources?

On Thursday, April 14, 2011 at 1:29 AM, eugene wrote:

Here is the gist:

https://gist.github.com/c1400655e0d8728cac49

This is a private gist, I didn't put the output.log file and
files.properties because they contains production data. Please let me
know if you need more information.

Thank you,
Eugene.

On Apr 13, 2:48 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

When you add an IndexRequest to the BulkRequestBuilder, it copies over the buffer. You should not see that in this case. Can you gist a sample of the code you use?

On Thursday, April 14, 2011 at 12:22 AM, eugene wrote:

Thanks Shay, it's a good advice. However, I am still getting this
exception. How can I avoid buffer reuse, since when I aggregate
objects using BulkRequestBuilder, I am still adding new IndexRequest
objects to the builder and populate XContentBuilder.
Also, can it be happening because I am not assigning id to each
IndexRequest? I checked and the id is set to null for all of them for
some reason.

On Apr 13, 12:53 am, Shay Banon shay.ba...@elasticsearch.com wrote:

BulkRequestBuilder is not thread safe. If you want to do that, then you can Use the XContetFactory#safeXXX methods to thread safe builders and use the original code you posted, or, do the conversion on the thread that adds elements to the bulk request.

On Wednesday, April 13, 2011 at 3:03 AM, eugene wrote:

I tried to add requests to BuildRequestBuilder but the exception is
still not going away. My program is multithreaded, where first thread
adds objects to to the queue, and another thread retrieves those
objects from the queue and indexes them as bulk. The json objects can
be thought of IndexRequest objects. What do you think will be the
best way to solve this problem? It is really not complex, but
BulkResponse behaves strangely for me.

On Apr 12, 11:55 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.

On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType(­­­­))
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.­­­­java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStream­­­­Input.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationReque­­­­st.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(Messa­­­­geChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(Mes­­­­sageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­s.java:
302)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFi­­­­reMessageReceived(FrameDecoder.java:
317)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(­­­­FrameDecoder.java:
299)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageRece­­­­ived(FrameDecoder.java:
216)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChann­­­­elsHandler.java:
51)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­DefaultChannelPipeline.java:
545)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­DefaultChannelPipeline.java:
540)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­s.java:
274)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­s.java:
261)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.­­­­java:
349)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.processSelected­­­­Keys(NioWorker.java:
280)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.j­­­­ava:
200)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenami­­­­ngRunnable.java:
108)
at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

If you have encountered such an error in the past, could you please
suggest what the most probable cause of it?

Thank you,
Eugene.- Hide quoted text -

  • Show quoted text -- Hide quoted text -
  • Show quoted text -- Hide quoted text -
  • Show quoted text -- Hide quoted text -
  • Show quoted text -

(jason) #12

I updated the gist following your approach, and unfortunately, still
coudn't get rid of this exception. What do you think I am doing
wrong?

On Apr 13, 3:58 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

Also, your code is not thread safe. You are using a LinkedList and access it from different threads without proper sync. Even if you sync on the methods of ES handler it won't help.

What you want to do is have a LinkedBlockingQueue as the message passing queue (better one to use is LinkedTransferQueue, but thats in Java7, though ES comes with one, so you can use it). Have the trailer push data into the queue, and have the processor poll for data. Once it gets a new element, add it to the BulkRequestBuilder. Then, using the BulkREquestBuilder#numberOfActions you can "flush" that bulk once it passes a specified bulk size, reset to a new BulkRequestBuilder, and continue from there.

I should probably write a helper (thread safe) BulkProcessor class that does that behind the scenes, and possibly be able to allow to control the number of concurrent bulk requests allowed, bulk size / actions, and so on, so things will be simpler.

On Thursday, April 14, 2011 at 1:50 AM, eugene wrote:

Oops, I made a mistake. I corrected it and updated the gist, and the
exception occurs.

On Apr 13, 3:46 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't understand your logic there in the LogProcessor run method. You create a single builder object, and then index it "bulkSize" times? Shouldn't you be index "bulkSize" different sources?

On Thursday, April 14, 2011 at 1:29 AM, eugene wrote:

Here is the gist:

https://gist.github.com/c1400655e0d8728cac49

This is a private gist, I didn't put the output.log file and
files.properties because they contains production data. Please let me
know if you need more information.

Thank you,
Eugene.

On Apr 13, 2:48 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

When you add an IndexRequest to the BulkRequestBuilder, it copies over the buffer. You should not see that in this case. Can you gist a sample of the code you use?

On Thursday, April 14, 2011 at 12:22 AM, eugene wrote:

Thanks Shay, it's a good advice. However, I am still getting this
exception. How can I avoid buffer reuse, since when I aggregate
objects using BulkRequestBuilder, I am still adding new IndexRequest
objects to the builder and populate XContentBuilder.
Also, can it be happening because I am not assigning id to each
IndexRequest? I checked and the id is set to null for all of them for
some reason.

On Apr 13, 12:53 am, Shay Banon shay.ba...@elasticsearch.com wrote:

BulkRequestBuilder is not thread safe. If you want to do that, then you can Use the XContetFactory#safeXXX methods to thread safe builders and use the original code you posted, or, do the conversion on the thread that adds elements to the bulk request.

On Wednesday, April 13, 2011 at 3:03 AM, eugene wrote:

I tried to add requests to BuildRequestBuilder but the exception is
still not going away. My program is multithreaded, where first thread
adds objects to to the queue, and another thread retrieves those
objects from the queue and indexes them as bulk. The json objects can
be thought of IndexRequest objects. What do you think will be the
best way to solve this problem? It is really not complex, but
BulkResponse behaves strangely for me.

On Apr 12, 11:55 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.

On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType(­­­­­))
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.­­­­­java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStream­­­­­Input.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationReque­­­­­st.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(Messa­­­­­geChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(Mes­­­­­sageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­­s.java:
302)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFi­­­­­reMessageReceived(FrameDecoder.java:
317)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(­­­­­FrameDecoder.java:
299)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageRece­­­­­ived(FrameDecoder.java:
216)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChann­­­­­elsHandler.java:
51)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­DefaultChannelPipeline.java:
545)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­DefaultChannelPipeline.java:
540)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­­s.java:
274)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­­s.java:
261)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.­­­­­java:
349)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.processSelected­­­­­Keys(NioWorker.java:
280)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.j­­­­­ava:
200)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenami­­­­­ngRunnable.java:
108)
at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

...

read more »- Hide quoted text -

  • Show quoted text -

(Shay Banon) #13

I don't know, can't see something that jumps out (but I might miss something). If you can recreate it with a simple test case, for example, something that just bulk indexes dummy data (or actual data, but it should bot be bigger than a main class), then I can have a look.
On Thursday, April 14, 2011 at 4:14 AM, eugene wrote:

I updated the gist following your approach, and unfortunately, still
coudn't get rid of this exception. What do you think I am doing
wrong?

On Apr 13, 3:58 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

Also, your code is not thread safe. You are using a LinkedList and access it from different threads without proper sync. Even if you sync on the methods of ES handler it won't help.

What you want to do is have a LinkedBlockingQueue as the message passing queue (better one to use is LinkedTransferQueue, but thats in Java7, though ES comes with one, so you can use it). Have the trailer push data into the queue, and have the processor poll for data. Once it gets a new element, add it to the BulkRequestBuilder. Then, using the BulkREquestBuilder#numberOfActions you can "flush" that bulk once it passes a specified bulk size, reset to a new BulkRequestBuilder, and continue from there.

I should probably write a helper (thread safe) BulkProcessor class that does that behind the scenes, and possibly be able to allow to control the number of concurrent bulk requests allowed, bulk size / actions, and so on, so things will be simpler.

On Thursday, April 14, 2011 at 1:50 AM, eugene wrote:

Oops, I made a mistake. I corrected it and updated the gist, and the
exception occurs.

On Apr 13, 3:46 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't understand your logic there in the LogProcessor run method. You create a single builder object, and then index it "bulkSize" times? Shouldn't you be index "bulkSize" different sources?

On Thursday, April 14, 2011 at 1:29 AM, eugene wrote:

Here is the gist:

https://gist.github.com/c1400655e0d8728cac49

This is a private gist, I didn't put the output.log file and
files.properties because they contains production data. Please let me
know if you need more information.

Thank you,
Eugene.

On Apr 13, 2:48 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

When you add an IndexRequest to the BulkRequestBuilder, it copies over the buffer. You should not see that in this case. Can you gist a sample of the code you use?

On Thursday, April 14, 2011 at 12:22 AM, eugene wrote:

Thanks Shay, it's a good advice. However, I am still getting this
exception. How can I avoid buffer reuse, since when I aggregate
objects using BulkRequestBuilder, I am still adding new IndexRequest
objects to the builder and populate XContentBuilder.
Also, can it be happening because I am not assigning id to each
IndexRequest? I checked and the id is set to null for all of them for
some reason.

On Apr 13, 12:53 am, Shay Banon shay.ba...@elasticsearch.com wrote:

BulkRequestBuilder is not thread safe. If you want to do that, then you can Use the XContetFactory#safeXXX methods to thread safe builders and use the original code you posted, or, do the conversion on the thread that adds elements to the bulk request.

On Wednesday, April 13, 2011 at 3:03 AM, eugene wrote:

I tried to add requests to BuildRequestBuilder but the exception is
still not going away. My program is multithreaded, where first thread
adds objects to to the queue, and another thread retrieves those
objects from the queue and indexes them as bulk. The json objects can
be thought of IndexRequest objects. What do you think will be the
best way to solve this problem? It is really not complex, but
BulkResponse behaves strangely for me.

On Apr 12, 11:55 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.

On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType(­­­­­))
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.­­­­­java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStream­­­­­Input.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationReque­­­­­st.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(Messa­­­­­geChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(Mes­­­­­sageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­­s.java:
302)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFi­­­­­reMessageReceived(FrameDecoder.java:
317)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(­­­­­FrameDecoder.java:
299)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageRece­­­­­ived(FrameDecoder.java:
216)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChann­­­­­elsHandler.java:
51)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­DefaultChannelPipeline.java:
545)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­DefaultChannelPipeline.java:
540)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­­s.java:
274)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­­s.java:
261)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.­­­­­java:
349)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.processSelected­­­­­Keys(NioWorker.java:
280)
at
org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.j­­­­­ava:
200)
at
org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenami­­­­­ngRunnable.java:
108)
at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker
$1.run(DeadLockProofWorker.java:44)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

...

read more »- Hide quoted text -

  • Show quoted text -

(jason) #14

I just recreated the simple test, and came up with the code that
works. I updated the gist and you may take a look.

It seems that I need to aggregate XContentBuilder first by adding
objects to it, and only at the end add XContentBuilder source to the
BulkRequestBuilder. Is this an intended behavior? Why can't I add
multiple IndexRequests with each having its own XContentBuilder
source?

On Apr 14, 2:06 am, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't know, can't see something that jumps out (but I might miss something). If you can recreate it with a simple test case, for example, something that just bulk indexes dummy data (or actual data, but it should bot be bigger than a main class), then I can have a look.

On Thursday, April 14, 2011 at 4:14 AM, eugene wrote:

I updated the gist following your approach, and unfortunately, still
coudn't get rid of this exception. What do you think I am doing
wrong?

On Apr 13, 3:58 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

Also, your code is not thread safe. You are using a LinkedList and access it from different threads without proper sync. Even if you sync on the methods of ES handler it won't help.

What you want to do is have a LinkedBlockingQueue as the message passing queue (better one to use is LinkedTransferQueue, but thats in Java7, though ES comes with one, so you can use it). Have the trailer push data into the queue, and have the processor poll for data. Once it gets a new element, add it to the BulkRequestBuilder. Then, using the BulkREquestBuilder#numberOfActions you can "flush" that bulk once it passes a specified bulk size, reset to a new BulkRequestBuilder, and continue from there.

I should probably write a helper (thread safe) BulkProcessor class that does that behind the scenes, and possibly be able to allow to control the number of concurrent bulk requests allowed, bulk size / actions, and so on, so things will be simpler.

On Thursday, April 14, 2011 at 1:50 AM, eugene wrote:

Oops, I made a mistake. I corrected it and updated the gist, and the
exception occurs.

On Apr 13, 3:46 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't understand your logic there in the LogProcessor run method. You create a single builder object, and then index it "bulkSize" times? Shouldn't you be index "bulkSize" different sources?

On Thursday, April 14, 2011 at 1:29 AM, eugene wrote:

Here is the gist:

https://gist.github.com/c1400655e0d8728cac49

This is a private gist, I didn't put the output.log file and
files.properties because they contains production data. Please let me
know if you need more information.

Thank you,
Eugene.

On Apr 13, 2:48 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

When you add an IndexRequest to the BulkRequestBuilder, it copies over the buffer. You should not see that in this case. Can you gist a sample of the code you use?

On Thursday, April 14, 2011 at 12:22 AM, eugene wrote:

Thanks Shay, it's a good advice. However, I am still getting this
exception. How can I avoid buffer reuse, since when I aggregate
objects using BulkRequestBuilder, I am still adding new IndexRequest
objects to the builder and populate XContentBuilder.
Also, can it be happening because I am not assigning id to each
IndexRequest? I checked and the id is set to null for all of them for
some reason.

On Apr 13, 12:53 am, Shay Banon shay.ba...@elasticsearch.com wrote:

BulkRequestBuilder is not thread safe. If you want to do that, then you can Use the XContetFactory#safeXXX methods to thread safe builders and use the original code you posted, or, do the conversion on the thread that adds elements to the bulk request.

On Wednesday, April 13, 2011 at 3:03 AM, eugene wrote:

I tried to add requests to BuildRequestBuilder but the exception is
still not going away. My program is multithreaded, where first thread
adds objects to to the queue, and another thread retrieves those
objects from the queue and indexes them as bulk. The json objects can
be thought of IndexRequest objects. What do you think will be the
best way to solve this problem? It is really not complex, but
BulkResponse behaves strangely for me.

On Apr 12, 11:55 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.

On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType(­­­­­­))
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.­­­­­­java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStream­­­­­­Input.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationReque­­­­­­st.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(Messa­­­­­­geChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(Mes­­­­­­sageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­­­s.java:
302)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFi­­­­­­reMessageReceived(FrameDecoder.java:
317)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(­­­­­­FrameDecoder.java:
299)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageRece­­­­­­ived(FrameDecoder.java:
216)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChann­­­­­­elsHandler.java:
51)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­­DefaultChannelPipeline.java:
545)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­­DefaultChannelPipeline.java:
540)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­­­s.java:
274)
at

...

read more »- Hide quoted text -

  • Show quoted text -

(Shay Banon) #15

You can use several index requests thats not a problem, the flow should be:

for (item in items) {
XCnotentBuilder builder = XContentFactory....
// build json using item
// add to bulk request the index request
}
On Thursday, April 14, 2011 at 8:41 PM, eugene wrote:

I just recreated the simple test, and came up with the code that
works. I updated the gist and you may take a look.

It seems that I need to aggregate XContentBuilder first by adding
objects to it, and only at the end add XContentBuilder source to the
BulkRequestBuilder. Is this an intended behavior? Why can't I add
multiple IndexRequests with each having its own XContentBuilder
source?

On Apr 14, 2:06 am, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't know, can't see something that jumps out (but I might miss something). If you can recreate it with a simple test case, for example, something that just bulk indexes dummy data (or actual data, but it should bot be bigger than a main class), then I can have a look.

On Thursday, April 14, 2011 at 4:14 AM, eugene wrote:

I updated the gist following your approach, and unfortunately, still
coudn't get rid of this exception. What do you think I am doing
wrong?

On Apr 13, 3:58 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

Also, your code is not thread safe. You are using a LinkedList and access it from different threads without proper sync. Even if you sync on the methods of ES handler it won't help.

What you want to do is have a LinkedBlockingQueue as the message passing queue (better one to use is LinkedTransferQueue, but thats in Java7, though ES comes with one, so you can use it). Have the trailer push data into the queue, and have the processor poll for data. Once it gets a new element, add it to the BulkRequestBuilder. Then, using the BulkREquestBuilder#numberOfActions you can "flush" that bulk once it passes a specified bulk size, reset to a new BulkRequestBuilder, and continue from there.

I should probably write a helper (thread safe) BulkProcessor class that does that behind the scenes, and possibly be able to allow to control the number of concurrent bulk requests allowed, bulk size / actions, and so on, so things will be simpler.

On Thursday, April 14, 2011 at 1:50 AM, eugene wrote:

Oops, I made a mistake. I corrected it and updated the gist, and the
exception occurs.

On Apr 13, 3:46 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't understand your logic there in the LogProcessor run method. You create a single builder object, and then index it "bulkSize" times? Shouldn't you be index "bulkSize" different sources?

On Thursday, April 14, 2011 at 1:29 AM, eugene wrote:

Here is the gist:

https://gist.github.com/c1400655e0d8728cac49

This is a private gist, I didn't put the output.log file and
files.properties because they contains production data. Please let me
know if you need more information.

Thank you,
Eugene.

On Apr 13, 2:48 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

When you add an IndexRequest to the BulkRequestBuilder, it copies over the buffer. You should not see that in this case. Can you gist a sample of the code you use?

On Thursday, April 14, 2011 at 12:22 AM, eugene wrote:

Thanks Shay, it's a good advice. However, I am still getting this
exception. How can I avoid buffer reuse, since when I aggregate
objects using BulkRequestBuilder, I am still adding new IndexRequest
objects to the builder and populate XContentBuilder.
Also, can it be happening because I am not assigning id to each
IndexRequest? I checked and the id is set to null for all of them for
some reason.

On Apr 13, 12:53 am, Shay Banon shay.ba...@elasticsearch.com wrote:

BulkRequestBuilder is not thread safe. If you want to do that, then you can Use the XContetFactory#safeXXX methods to thread safe builders and use the original code you posted, or, do the conversion on the thread that adds elements to the bulk request.

On Wednesday, April 13, 2011 at 3:03 AM, eugene wrote:

I tried to add requests to BuildRequestBuilder but the exception is
still not going away. My program is multithreaded, where first thread
adds objects to to the queue, and another thread retrieves those
objects from the queue and indexes them as bulk. The json objects can
be thought of IndexRequest objects. What do you think will be the
best way to solve this problem? It is really not complex, but
BulkResponse behaves strangely for me.

On Apr 12, 11:55 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.

On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType(­­­­­­))
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.­­­­­­java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStream­­­­­­Input.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationReque­­­­­­st.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(Messa­­­­­­geChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(Mes­­­­­­sageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­­­s.java:
302)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFi­­­­­­reMessageReceived(FrameDecoder.java:
317)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(­­­­­­FrameDecoder.java:
299)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageRece­­­­­­ived(FrameDecoder.java:
216)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChann­­­­­­elsHandler.java:
51)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­­DefaultChannelPipeline.java:
545)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­­DefaultChannelPipeline.java:
540)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­­­s.java:
274)
at

...

read more »- Hide quoted text -

  • Show quoted text -

(jason) #16

This is the part that does not work for me. Have you tried the
example from the gist? Please let me know if you see anything wrong in
my code.

On Apr 14, 10:57 am, Shay Banon shay.ba...@elasticsearch.com wrote:

You can use several index requests thats not a problem, the flow should be:

for (item in items) {
XCnotentBuilder builder = XContentFactory....
// build json using item
// add to bulk request the index request

}
On Thursday, April 14, 2011 at 8:41 PM, eugene wrote:

I just recreated the simple test, and came up with the code that
works. I updated the gist and you may take a look.

It seems that I need to aggregate XContentBuilder first by adding
objects to it, and only at the end add XContentBuilder source to the
BulkRequestBuilder. Is this an intended behavior? Why can't I add
multiple IndexRequests with each having its own XContentBuilder
source?

On Apr 14, 2:06 am, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't know, can't see something that jumps out (but I might miss something). If you can recreate it with a simple test case, for example, something that just bulk indexes dummy data (or actual data, but it should bot be bigger than a main class), then I can have a look.

On Thursday, April 14, 2011 at 4:14 AM, eugene wrote:

I updated the gist following your approach, and unfortunately, still
coudn't get rid of this exception. What do you think I am doing
wrong?

On Apr 13, 3:58 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

Also, your code is not thread safe. You are using a LinkedList and access it from different threads without proper sync. Even if you sync on the methods of ES handler it won't help.

What you want to do is have a LinkedBlockingQueue as the message passing queue (better one to use is LinkedTransferQueue, but thats in Java7, though ES comes with one, so you can use it). Have the trailer push data into the queue, and have the processor poll for data. Once it gets a new element, add it to the BulkRequestBuilder. Then, using the BulkREquestBuilder#numberOfActions you can "flush" that bulk once it passes a specified bulk size, reset to a new BulkRequestBuilder, and continue from there.

I should probably write a helper (thread safe) BulkProcessor class that does that behind the scenes, and possibly be able to allow to control the number of concurrent bulk requests allowed, bulk size / actions, and so on, so things will be simpler.

On Thursday, April 14, 2011 at 1:50 AM, eugene wrote:

Oops, I made a mistake. I corrected it and updated the gist, and the
exception occurs.

On Apr 13, 3:46 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't understand your logic there in the LogProcessor run method. You create a single builder object, and then index it "bulkSize" times? Shouldn't you be index "bulkSize" different sources?

On Thursday, April 14, 2011 at 1:29 AM, eugene wrote:

Here is the gist:

https://gist.github.com/c1400655e0d8728cac49

This is a private gist, I didn't put the output.log file and
files.properties because they contains production data. Please let me
know if you need more information.

Thank you,
Eugene.

On Apr 13, 2:48 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

When you add an IndexRequest to the BulkRequestBuilder, it copies over the buffer. You should not see that in this case. Can you gist a sample of the code you use?

On Thursday, April 14, 2011 at 12:22 AM, eugene wrote:

Thanks Shay, it's a good advice. However, I am still getting this
exception. How can I avoid buffer reuse, since when I aggregate
objects using BulkRequestBuilder, I am still adding new IndexRequest
objects to the builder and populate XContentBuilder.
Also, can it be happening because I am not assigning id to each
IndexRequest? I checked and the id is set to null for all of them for
some reason.

On Apr 13, 12:53 am, Shay Banon shay.ba...@elasticsearch.com wrote:

BulkRequestBuilder is not thread safe. If you want to do that, then you can Use the XContetFactory#safeXXX methods to thread safe builders and use the original code you posted, or, do the conversion on the thread that adds elements to the bulk request.

On Wednesday, April 13, 2011 at 3:03 AM, eugene wrote:

I tried to add requests to BuildRequestBuilder but the exception is
still not going away. My program is multithreaded, where first thread
adds objects to to the queue, and another thread retrieves those
objects from the queue and indexes them as bulk. The json objects can
be thought of IndexRequest objects. What do you think will be the
best way to solve this problem? It is really not complex, but
BulkResponse behaves strangely for me.

On Apr 12, 11:55 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.

On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType(­­­­­­­))
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.­­­­­­­java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStream­­­­­­­Input.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationReque­­­­­­­st.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(Messa­­­­­­­geChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(Mes­­­­­­­sageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­­­­s.java:
302)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFi­­­­­­­reMessageReceived(FrameDecoder.java:
317)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(­­­­­­­FrameDecoder.java:
299)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageRece­­­­­­­ived(FrameDecoder.java:
216)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­­­­pstream(SimpleChannelUpstreamHandler.java:
80)

...

read more »- Hide quoted text -

  • Show quoted text -

(Shay Banon) #17

You are not doing what I wrote in any version of your gist. Not sure where the disconnect is.... To go the extra mile, I took your code and gisted how it should work: https://gist.github.com/920160.
On Thursday, April 14, 2011 at 9:12 PM, eugene wrote:

This is the part that does not work for me. Have you tried the
example from the gist? Please let me know if you see anything wrong in
my code.

On Apr 14, 10:57 am, Shay Banon shay.ba...@elasticsearch.com wrote:

You can use several index requests thats not a problem, the flow should be:

for (item in items) {
XCnotentBuilder builder = XContentFactory....
// build json using item
// add to bulk request the index request

}
On Thursday, April 14, 2011 at 8:41 PM, eugene wrote:

I just recreated the simple test, and came up with the code that
works. I updated the gist and you may take a look.

It seems that I need to aggregate XContentBuilder first by adding
objects to it, and only at the end add XContentBuilder source to the
BulkRequestBuilder. Is this an intended behavior? Why can't I add
multiple IndexRequests with each having its own XContentBuilder
source?

On Apr 14, 2:06 am, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't know, can't see something that jumps out (but I might miss something). If you can recreate it with a simple test case, for example, something that just bulk indexes dummy data (or actual data, but it should bot be bigger than a main class), then I can have a look.

On Thursday, April 14, 2011 at 4:14 AM, eugene wrote:

I updated the gist following your approach, and unfortunately, still
coudn't get rid of this exception. What do you think I am doing
wrong?

On Apr 13, 3:58 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

Also, your code is not thread safe. You are using a LinkedList and access it from different threads without proper sync. Even if you sync on the methods of ES handler it won't help.

What you want to do is have a LinkedBlockingQueue as the message passing queue (better one to use is LinkedTransferQueue, but thats in Java7, though ES comes with one, so you can use it). Have the trailer push data into the queue, and have the processor poll for data. Once it gets a new element, add it to the BulkRequestBuilder. Then, using the BulkREquestBuilder#numberOfActions you can "flush" that bulk once it passes a specified bulk size, reset to a new BulkRequestBuilder, and continue from there.

I should probably write a helper (thread safe) BulkProcessor class that does that behind the scenes, and possibly be able to allow to control the number of concurrent bulk requests allowed, bulk size / actions, and so on, so things will be simpler.

On Thursday, April 14, 2011 at 1:50 AM, eugene wrote:

Oops, I made a mistake. I corrected it and updated the gist, and the
exception occurs.

On Apr 13, 3:46 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't understand your logic there in the LogProcessor run method. You create a single builder object, and then index it "bulkSize" times? Shouldn't you be index "bulkSize" different sources?

On Thursday, April 14, 2011 at 1:29 AM, eugene wrote:

Here is the gist:

https://gist.github.com/c1400655e0d8728cac49

This is a private gist, I didn't put the output.log file and
files.properties because they contains production data. Please let me
know if you need more information.

Thank you,
Eugene.

On Apr 13, 2:48 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

When you add an IndexRequest to the BulkRequestBuilder, it copies over the buffer. You should not see that in this case. Can you gist a sample of the code you use?

On Thursday, April 14, 2011 at 12:22 AM, eugene wrote:

Thanks Shay, it's a good advice. However, I am still getting this
exception. How can I avoid buffer reuse, since when I aggregate
objects using BulkRequestBuilder, I am still adding new IndexRequest
objects to the builder and populate XContentBuilder.
Also, can it be happening because I am not assigning id to each
IndexRequest? I checked and the id is set to null for all of them for
some reason.

On Apr 13, 12:53 am, Shay Banon shay.ba...@elasticsearch.com wrote:

BulkRequestBuilder is not thread safe. If you want to do that, then you can Use the XContetFactory#safeXXX methods to thread safe builders and use the original code you posted, or, do the conversion on the thread that adds elements to the bulk request.

On Wednesday, April 13, 2011 at 3:03 AM, eugene wrote:

I tried to add requests to BuildRequestBuilder but the exception is
still not going away. My program is multithreaded, where first thread
adds objects to to the queue, and another thread retrieves those
objects from the queue and indexes them as bulk. The json objects can
be thought of IndexRequest objects. What do you think will be the
best way to solve this problem? It is really not complex, but
BulkResponse behaves strangely for me.

On Apr 12, 11:55 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.

On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType(­­­­­­­))
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.­­­­­­­java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStream­­­­­­­Input.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationReque­­­­­­­st.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(Messa­­­­­­­geChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(Mes­­­­­­­sageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at
org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channel­­­­­­­s.java:
302)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFi­­­­­­­reMessageReceived(FrameDecoder.java:
317)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(­­­­­­­FrameDecoder.java:
299)
at
org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageRece­­­­­­­ived(FrameDecoder.java:
216)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­­­­pstream(SimpleChannelUpstreamHandler.java:
80)

...

read more »- Hide quoted text -

  • Show quoted text -

(jason) #18

Thank you very much for posting an example. I am sorry to keep
bugging you, but this code is not working for me either. I think what
I wrote in my version and what you suggested is the same. You are
recreating XContentBuilder within the loop, and I am adding only two
items so no loop is needed.
Anyway, could you try running your code and let me know if it runs
successfully?

Thanks,
Eugene.

On Apr 14, 11:35 am, Shay Banon shay.ba...@elasticsearch.com wrote:

You are not doing what I wrote in any version of your gist. Not sure where the disconnect is.... To go the extra mile, I took your code and gisted how it should work:https://gist.github.com/920160.

On Thursday, April 14, 2011 at 9:12 PM, eugene wrote:

This is the part that does not work for me. Have you tried the
example from the gist? Please let me know if you see anything wrong in
my code.

On Apr 14, 10:57 am, Shay Banon shay.ba...@elasticsearch.com wrote:

You can use several index requests thats not a problem, the flow should be:

for (item in items) {
XCnotentBuilder builder = XContentFactory....
// build json using item
// add to bulk request the index request

}
On Thursday, April 14, 2011 at 8:41 PM, eugene wrote:

I just recreated the simple test, and came up with the code that
works. I updated the gist and you may take a look.

It seems that I need to aggregate XContentBuilder first by adding
objects to it, and only at the end add XContentBuilder source to the
BulkRequestBuilder. Is this an intended behavior? Why can't I add
multiple IndexRequests with each having its own XContentBuilder
source?

On Apr 14, 2:06 am, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't know, can't see something that jumps out (but I might miss something). If you can recreate it with a simple test case, for example, something that just bulk indexes dummy data (or actual data, but it should bot be bigger than a main class), then I can have a look.

On Thursday, April 14, 2011 at 4:14 AM, eugene wrote:

I updated the gist following your approach, and unfortunately, still
coudn't get rid of this exception. What do you think I am doing
wrong?

On Apr 13, 3:58 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

Also, your code is not thread safe. You are using a LinkedList and access it from different threads without proper sync. Even if you sync on the methods of ES handler it won't help.

What you want to do is have a LinkedBlockingQueue as the message passing queue (better one to use is LinkedTransferQueue, but thats in Java7, though ES comes with one, so you can use it). Have the trailer push data into the queue, and have the processor poll for data. Once it gets a new element, add it to the BulkRequestBuilder. Then, using the BulkREquestBuilder#numberOfActions you can "flush" that bulk once it passes a specified bulk size, reset to a new BulkRequestBuilder, and continue from there.

I should probably write a helper (thread safe) BulkProcessor class that does that behind the scenes, and possibly be able to allow to control the number of concurrent bulk requests allowed, bulk size / actions, and so on, so things will be simpler.

On Thursday, April 14, 2011 at 1:50 AM, eugene wrote:

Oops, I made a mistake. I corrected it and updated the gist, and the
exception occurs.

On Apr 13, 3:46 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't understand your logic there in the LogProcessor run method. You create a single builder object, and then index it "bulkSize" times? Shouldn't you be index "bulkSize" different sources?

On Thursday, April 14, 2011 at 1:29 AM, eugene wrote:

Here is the gist:

https://gist.github.com/c1400655e0d8728cac49

This is a private gist, I didn't put the output.log file and
files.properties because they contains production data. Please let me
know if you need more information.

Thank you,
Eugene.

On Apr 13, 2:48 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

When you add an IndexRequest to the BulkRequestBuilder, it copies over the buffer. You should not see that in this case. Can you gist a sample of the code you use?

On Thursday, April 14, 2011 at 12:22 AM, eugene wrote:

Thanks Shay, it's a good advice. However, I am still getting this
exception. How can I avoid buffer reuse, since when I aggregate
objects using BulkRequestBuilder, I am still adding new IndexRequest
objects to the builder and populate XContentBuilder.
Also, can it be happening because I am not assigning id to each
IndexRequest? I checked and the id is set to null for all of them for
some reason.

On Apr 13, 12:53 am, Shay Banon shay.ba...@elasticsearch.com wrote:

BulkRequestBuilder is not thread safe. If you want to do that, then you can Use the XContetFactory#safeXXX methods to thread safe builders and use the original code you posted, or, do the conversion on the thread that adds elements to the bulk request.

On Wednesday, April 13, 2011 at 3:03 AM, eugene wrote:

I tried to add requests to BuildRequestBuilder but the exception is
still not going away. My program is multithreaded, where first thread
adds objects to to the queue, and another thread retrieves those
objects from the queue and indexes them as bulk. The json objects can
be thought of IndexRequest objects. What do you think will be the
best way to solve this problem? It is really not complex, but
BulkResponse behaves strangely for me.

On Apr 12, 11:55 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.

On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType(­­­­­­­­))
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.­­­­­­­­java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStream­­­­­­­­Input.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationReque­­­­­­­­st.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(Messa­­­­­­­­geChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(Mes­­­­­­­­sageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­­­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at

...

read more »- Hide quoted text -

  • Show quoted text -

(Shay Banon) #19

Yes, it runs fine for me. Which version are you using? I ran it on master, but nothing specifically changed in that area from 0.15.2.

-shay.banon
On Thursday, April 14, 2011 at 11:18 PM, eugene wrote:

Thank you very much for posting an example. I am sorry to keep
bugging you, but this code is not working for me either. I think what
I wrote in my version and what you suggested is the same. You are
recreating XContentBuilder within the loop, and I am adding only two
items so no loop is needed.
Anyway, could you try running your code and let me know if it runs
successfully?

Thanks,
Eugene.

On Apr 14, 11:35 am, Shay Banon shay.ba...@elasticsearch.com wrote:

You are not doing what I wrote in any version of your gist. Not sure where the disconnect is.... To go the extra mile, I took your code and gisted how it should work:https://gist.github.com/920160.

On Thursday, April 14, 2011 at 9:12 PM, eugene wrote:

This is the part that does not work for me. Have you tried the
example from the gist? Please let me know if you see anything wrong in
my code.

On Apr 14, 10:57 am, Shay Banon shay.ba...@elasticsearch.com wrote:

You can use several index requests thats not a problem, the flow should be:

for (item in items) {
XCnotentBuilder builder = XContentFactory....
// build json using item
// add to bulk request the index request

}
On Thursday, April 14, 2011 at 8:41 PM, eugene wrote:

I just recreated the simple test, and came up with the code that
works. I updated the gist and you may take a look.

It seems that I need to aggregate XContentBuilder first by adding
objects to it, and only at the end add XContentBuilder source to the
BulkRequestBuilder. Is this an intended behavior? Why can't I add
multiple IndexRequests with each having its own XContentBuilder
source?

On Apr 14, 2:06 am, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't know, can't see something that jumps out (but I might miss something). If you can recreate it with a simple test case, for example, something that just bulk indexes dummy data (or actual data, but it should bot be bigger than a main class), then I can have a look.

On Thursday, April 14, 2011 at 4:14 AM, eugene wrote:

I updated the gist following your approach, and unfortunately, still
coudn't get rid of this exception. What do you think I am doing
wrong?

On Apr 13, 3:58 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

Also, your code is not thread safe. You are using a LinkedList and access it from different threads without proper sync. Even if you sync on the methods of ES handler it won't help.

What you want to do is have a LinkedBlockingQueue as the message passing queue (better one to use is LinkedTransferQueue, but thats in Java7, though ES comes with one, so you can use it). Have the trailer push data into the queue, and have the processor poll for data. Once it gets a new element, add it to the BulkRequestBuilder. Then, using the BulkREquestBuilder#numberOfActions you can "flush" that bulk once it passes a specified bulk size, reset to a new BulkRequestBuilder, and continue from there.

I should probably write a helper (thread safe) BulkProcessor class that does that behind the scenes, and possibly be able to allow to control the number of concurrent bulk requests allowed, bulk size / actions, and so on, so things will be simpler.

On Thursday, April 14, 2011 at 1:50 AM, eugene wrote:

Oops, I made a mistake. I corrected it and updated the gist, and the
exception occurs.

On Apr 13, 3:46 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't understand your logic there in the LogProcessor run method. You create a single builder object, and then index it "bulkSize" times? Shouldn't you be index "bulkSize" different sources?

On Thursday, April 14, 2011 at 1:29 AM, eugene wrote:

Here is the gist:

https://gist.github.com/c1400655e0d8728cac49

This is a private gist, I didn't put the output.log file and
files.properties because they contains production data. Please let me
know if you need more information.

Thank you,
Eugene.

On Apr 13, 2:48 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

When you add an IndexRequest to the BulkRequestBuilder, it copies over the buffer. You should not see that in this case. Can you gist a sample of the code you use?

On Thursday, April 14, 2011 at 12:22 AM, eugene wrote:

Thanks Shay, it's a good advice. However, I am still getting this
exception. How can I avoid buffer reuse, since when I aggregate
objects using BulkRequestBuilder, I am still adding new IndexRequest
objects to the builder and populate XContentBuilder.
Also, can it be happening because I am not assigning id to each
IndexRequest? I checked and the id is set to null for all of them for
some reason.

On Apr 13, 12:53 am, Shay Banon shay.ba...@elasticsearch.com wrote:

BulkRequestBuilder is not thread safe. If you want to do that, then you can Use the XContetFactory#safeXXX methods to thread safe builders and use the original code you posted, or, do the conversion on the thread that adds elements to the bulk request.

On Wednesday, April 13, 2011 at 3:03 AM, eugene wrote:

I tried to add requests to BuildRequestBuilder but the exception is
still not going away. My program is multithreaded, where first thread
adds objects to to the queue, and another thread retrieves those
objects from the queue and indexes them as bulk. The json objects can
be thought of IndexRequest objects. What do you think will be the
best way to solve this problem? It is really not complex, but
BulkResponse behaves strangely for me.

On Apr 12, 11:55 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.

On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType(­­­­­­­­))
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.­­­­­­­­java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStream­­­­­­­­Input.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationReque­­­­­­­­st.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)
at
org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(Messa­­­­­­­­geChannelHandler.java:
180)
at
org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(Mes­­­­­­­­sageChannelHandler.java:
85)
at
org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleU­­­­­­­­pstream(SimpleChannelUpstreamHandler.java:
80)
at
org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(­­­­­­­­DefaultChannelPipeline.java:
545)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline
$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:
754)
at

...

read more »- Hide quoted text -

  • Show quoted text -

(jason) #20

I am using 0.16.0-snapshot version. I see two variables builder and
builder2 in your example. Are you adding both to BulkRequestBuilder?

On Apr 14, 1:51 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

Yes, it runs fine for me. Which version are you using? I ran it on master, but nothing specifically changed in that area from 0.15.2.

-shay.banon

On Thursday, April 14, 2011 at 11:18 PM, eugene wrote:

Thank you very much for posting an example. I am sorry to keep
bugging you, but this code is not working for me either. I think what
I wrote in my version and what you suggested is the same. You are
recreating XContentBuilder within the loop, and I am adding only two
items so no loop is needed.
Anyway, could you try running your code and let me know if it runs
successfully?

Thanks,
Eugene.

On Apr 14, 11:35 am, Shay Banon shay.ba...@elasticsearch.com wrote:

You are not doing what I wrote in any version of your gist. Not sure where the disconnect is.... To go the extra mile, I took your code and gisted how it should work:https://gist.github.com/920160.

On Thursday, April 14, 2011 at 9:12 PM, eugene wrote:

This is the part that does not work for me. Have you tried the
example from the gist? Please let me know if you see anything wrong in
my code.

On Apr 14, 10:57 am, Shay Banon shay.ba...@elasticsearch.com wrote:

You can use several index requests thats not a problem, the flow should be:

for (item in items) {
XCnotentBuilder builder = XContentFactory....
// build json using item
// add to bulk request the index request

}
On Thursday, April 14, 2011 at 8:41 PM, eugene wrote:

I just recreated the simple test, and came up with the code that
works. I updated the gist and you may take a look.

It seems that I need to aggregate XContentBuilder first by adding
objects to it, and only at the end add XContentBuilder source to the
BulkRequestBuilder. Is this an intended behavior? Why can't I add
multiple IndexRequests with each having its own XContentBuilder
source?

On Apr 14, 2:06 am, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't know, can't see something that jumps out (but I might miss something). If you can recreate it with a simple test case, for example, something that just bulk indexes dummy data (or actual data, but it should bot be bigger than a main class), then I can have a look.

On Thursday, April 14, 2011 at 4:14 AM, eugene wrote:

I updated the gist following your approach, and unfortunately, still
coudn't get rid of this exception. What do you think I am doing
wrong?

On Apr 13, 3:58 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

Also, your code is not thread safe. You are using a LinkedList and access it from different threads without proper sync. Even if you sync on the methods of ES handler it won't help.

What you want to do is have a LinkedBlockingQueue as the message passing queue (better one to use is LinkedTransferQueue, but thats in Java7, though ES comes with one, so you can use it). Have the trailer push data into the queue, and have the processor poll for data. Once it gets a new element, add it to the BulkRequestBuilder. Then, using the BulkREquestBuilder#numberOfActions you can "flush" that bulk once it passes a specified bulk size, reset to a new BulkRequestBuilder, and continue from there.

I should probably write a helper (thread safe) BulkProcessor class that does that behind the scenes, and possibly be able to allow to control the number of concurrent bulk requests allowed, bulk size / actions, and so on, so things will be simpler.

On Thursday, April 14, 2011 at 1:50 AM, eugene wrote:

Oops, I made a mistake. I corrected it and updated the gist, and the
exception occurs.

On Apr 13, 3:46 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

I don't understand your logic there in the LogProcessor run method. You create a single builder object, and then index it "bulkSize" times? Shouldn't you be index "bulkSize" different sources?

On Thursday, April 14, 2011 at 1:29 AM, eugene wrote:

Here is the gist:

https://gist.github.com/c1400655e0d8728cac49

This is a private gist, I didn't put the output.log file and
files.properties because they contains production data. Please let me
know if you need more information.

Thank you,
Eugene.

On Apr 13, 2:48 pm, Shay Banon shay.ba...@elasticsearch.com wrote:

When you add an IndexRequest to the BulkRequestBuilder, it copies over the buffer. You should not see that in this case. Can you gist a sample of the code you use?

On Thursday, April 14, 2011 at 12:22 AM, eugene wrote:

Thanks Shay, it's a good advice. However, I am still getting this
exception. How can I avoid buffer reuse, since when I aggregate
objects using BulkRequestBuilder, I am still adding new IndexRequest
objects to the builder and populate XContentBuilder.
Also, can it be happening because I am not assigning id to each
IndexRequest? I checked and the id is set to null for all of them for
some reason.

On Apr 13, 12:53 am, Shay Banon shay.ba...@elasticsearch.com wrote:

BulkRequestBuilder is not thread safe. If you want to do that, then you can Use the XContetFactory#safeXXX methods to thread safe builders and use the original code you posted, or, do the conversion on the thread that adds elements to the bulk request.

On Wednesday, April 13, 2011 at 3:03 AM, eugene wrote:

I tried to add requests to BuildRequestBuilder but the exception is
still not going away. My program is multithreaded, where first thread
adds objects to to the queue, and another thread retrieves those
objects from the queue and indexes them as bulk. The json objects can
be thought of IndexRequest objects. What do you think will be the
best way to solve this problem? It is really not complex, but
BulkResponse behaves strangely for me.

On Apr 12, 11:55 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Don't use a list of XContentBuilders, there is no need for it (and what causes the problem because of buffers reuse). Simply use prepareBulk, save the BulkRequestBuilder, and use that as your aggregator.

On Tuesday, April 12, 2011 at 9:30 PM, eugene wrote:

Hi everyone,

I am trying to send a bulk request to the elastic search, and I am
getting an exception. Please see the code and the stacktrace below:

LinkedList queue = .....//assume
the queue is populated
BulkRequestBuilder request = client.prepareBulk();
int bulkSize = 2;
for (int i = 0; i < bulkSize; i++) {
XContentBuilder jsonObj = queue.remove();

request.add(Requests.indexRequest(handler.getIndex()).type(handler.getType(­­­­­­­­­))
.source(jsonObj));
}

BulkResponse response = request.execute().actionGet(); //throws
exception at this line
if (response.hasFailures()) {
System.err.println("--> failures...");
}

Exception in thread "Thread-1"
org.elasticsearch.transport.RemoteTransportException: [Black Cat]
[inet[/10.32.40.17:9300]][indices/bulk]
Caused by: java.io.EOFException
at
org.elasticsearch.common.io.stream.LZFStreamInput.readBytes(LZFStreamInput.­­­­­­­­­java:
97)
at
org.elasticsearch.common.io.stream.StreamInput.readUTF(StreamInput.java:
123)
at
org.elasticsearch.common.io.stream.HandlesStreamInput.readUTF(HandlesStream­­­­­­­­­Input.java:
49)
at
org.elasticsearch.action.support.replication.ShardReplicationOperationReque­­­­­­­­­st.readFrom(ShardReplicationOperationRequest.java:
132)
at
org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:
573)
at
org.elasticsearch.action.bulk.BulkRequest.readFrom(BulkRequest.java:
261)

...

read more »- Hide quoted text -

  • Show quoted text -