Indexing large number of documents


(Petr Janský) #1

Hello,

I'm trying to index >300k docs using Java API.

public class Fetcher {

  • public static String server = "localhost"; *
  • public static Integer port = 9300;*
  • public static String index = "default";*
  • public static String type = "default";*
  • public static String typeAttributename = null;*
  • static Client client = null;*
  • private static Fetcher inst;*
  • Settings settings = ImmutableSettings.settingsBuilder()*
  • .put("cluster.name", "elasticsearch")*
  • .put("node.name", "Killer")*
  • .build();*
  • public synchronized static Fetcher getInstace(){*
  • if(inst == null){*
  • inst = new Fetcher();*
  • }*
  • return inst;*
  • }*
  • public Fetcher() {*
  • client = new TransportClient(settings).addTransportAddress(new
    InetSocketTransportAddress(server, port));*
  • }*
  • public void index(DocumentVo document) {*
  • try {*
  • String type = Fetcher.type;*
  • if(typeAttributename != null && document.getData().get(typeAttributename)
    != null){*
  • type = document.getData().get(typeAttributename).toString();*
  • type = type.toLowerCase();*
  • }*
  • IndexRequestBuilder rs =
    client.prepareIndex().setIndex(index).setType(type);*
  • rs.setTimeout(new TimeValue(10000));*
  • rs.setSource(document.getData());*
  • rs.execute().actionGet();*
  • } catch (Exception e) {*
  • e.printStackTrace();*
  • client.close();*
  • client = new TransportClient(settings).addTransportAddress(new
    InetSocketTransportAddress(server, port));*
  • index(document);*
  • } *
  • }*
  • public void close(){*
  • client.close();*
  • }*
    }

in ~20 threads I run

Fetcher.getInstace().index(document);

I've created my own tokenizer filter that is quite slow so I'm getting

Feb 17, 2014 9:53:51 AM org.elasticsearch.client.transport
INFO: [Killer] failed to get node info for
[#transport#-1][inet[localhost/127.0.0.1:9300]], disconnecting...
org.elasticsearch.transport.ReceiveTimeoutTransportException:
[][inet[localhost/127.0.0.1:9300]][cluster/nodes/info] request_id [2899]
timed out after [5001ms]
at
org.elasticsearch.transport.TransportService$TimeoutHandler.run(TransportService.java:351)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

org.elasticsearch.client.transport.NoNodeAvailableException: No node
available
at
org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:249)
at
org.elasticsearch.action.TransportActionNodeProxy$1.handleException(TransportActionNodeProxy.java:84)
at
org.elasticsearch.transport.TransportService$Adapter$2$1.run(TransportService.java:311)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

It seems that
rs.setTimeout(new TimeValue(10000));
in my index method doesn't work.

How can I setup timeout for indexing using API?

Is it correct to use one TransportCilent for multiple(10-60) threads?

Thanks
Petr

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/f5f15b57-955c-4fcf-b225-3974e37e447b%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.


(Jilles van Gurp) #2

You'll want to use the batch API instead of indexing one document at the
time. That scales a lot better. I've done tens of millions of documents
like that in minutes. Basically, you can use mutlithreading with batch as
well but you may want to not outnumber the number of cpus you can dedicate
to indexing. Keep the batch sizes limited to a few hundred to a few
thousand at most. Basically go for a size that es still handles in around a
second or so.

If you really need to index one document at the time, you'll want to
probably reduce the ambition level a bit with the number of threads. The
error you are getting means that all nodes are busy with your previous
requests. Increasing the timeout won't fix your problem; these requests
normally should be in the range of a few ms and the fact that they are not,
means you are hitting a bottleneck somewhere.

Jilles

On Monday, February 17, 2014 10:04:19 AM UTC+1, Petr Janský wrote:

Hello,

I'm trying to index >300k docs using Java API.

public class Fetcher {

  • public static String server = "localhost"; *
  • public static Integer port = 9300;*
  • public static String index = "default";*
  • public static String type = "default";*
  • public static String typeAttributename = null;*
  • static Client client = null;*
  • private static Fetcher inst;*
  • Settings settings = ImmutableSettings.settingsBuilder()*
  • .put("cluster.name http://cluster.name", "elasticsearch")*
  • .put("node.name http://node.name", "Killer")*
  • .build();*
  • public synchronized static Fetcher getInstace(){*
  • if(inst == null){*
  • inst = new Fetcher();*
  • }*
  • return inst;*
  • }*
  • public Fetcher() {*
  • client = new TransportClient(settings).addTransportAddress(new
    InetSocketTransportAddress(server, port));*
  • }*
  • public void index(DocumentVo document) {*
  • try {*
  • String type = Fetcher.type;*
  • if(typeAttributename != null &&
    document.getData().get(typeAttributename) != null){*
  • type = document.getData().get(typeAttributename).toString();*
  • type = type.toLowerCase();*
  • }*
  • IndexRequestBuilder rs =
    client.prepareIndex().setIndex(index).setType(type);*
  • rs.setTimeout(new TimeValue(10000));*
  • rs.setSource(document.getData());*
  • rs.execute().actionGet();*
  • } catch (Exception e) {*
  • e.printStackTrace();*
  • client.close();*
  • client = new TransportClient(settings).addTransportAddress(new
    InetSocketTransportAddress(server, port));*
  • index(document);*
  • } *
  • }*
  • public void close(){*
  • client.close();*
  • }*
    }

in ~20 threads I run

Fetcher.getInstace().index(document);

I've created my own tokenizer filter that is quite slow so I'm getting

Feb 17, 2014 9:53:51 AM org.elasticsearch.client.transport
INFO: [Killer] failed to get node info for
[#transport#-1][inet[localhost/127.0.0.1:9300]], disconnecting...
org.elasticsearch.transport.ReceiveTimeoutTransportException:
[][inet[localhost/127.0.0.1:9300]][cluster/nodes/info] request_id [2899]
timed out after [5001ms]
at
org.elasticsearch.transport.TransportService$TimeoutHandler.run(TransportService.java:351)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

org.elasticsearch.client.transport.NoNodeAvailableException: No node
available
at
org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:249)
at
org.elasticsearch.action.TransportActionNodeProxy$1.handleException(TransportActionNodeProxy.java:84)
at
org.elasticsearch.transport.TransportService$Adapter$2$1.run(TransportService.java:311)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

It seems that
rs.setTimeout(new TimeValue(10000));
in my index method doesn't work.

How can I setup timeout for indexing using API?

Is it correct to use one TransportCilent for multiple(10-60) threads?

Thanks
Petr

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/88647d41-58fd-4e27-9e6a-80ee312fb439%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.


(Ivan Brusic) #3

You are overwhelming the elasticsearch server. Instead of playing around
with the timeout settings and the number of threads, consider using the
Bulk API:
http://www.elasticsearch.org/guide/en/elasticsearch/client/java-api/current/bulk.html

The bulk processor class is extremely useful:
http://xbib.org/elasticsearch/1.0.0.Beta2-SNAPSHOT/apidocs/org/elasticsearch/action/bulk/BulkProcessor.html

--
Ivan

On Mon, Feb 17, 2014 at 1:04 AM, Petr Janský petr.jansky@6hats.cz wrote:

Hello,

I'm trying to index >300k docs using Java API.

public class Fetcher {

  • public static String server = "localhost"; *
  • public static Integer port = 9300;*
  • public static String index = "default";*
  • public static String type = "default";*
  • public static String typeAttributename = null;*
  • static Client client = null;*
  • private static Fetcher inst;*
  • Settings settings = ImmutableSettings.settingsBuilder()*
  • .put("cluster.name http://cluster.name", "elasticsearch")*
  • .put("node.name http://node.name", "Killer")*
  • .build();*
  • public synchronized static Fetcher getInstace(){*
  • if(inst == null){*
  • inst = new Fetcher();*
  • }*
  • return inst;*
  • }*
  • public Fetcher() {*
  • client = new TransportClient(settings).addTransportAddress(new
    InetSocketTransportAddress(server, port));*
  • }*
  • public void index(DocumentVo document) {*
  • try {*
  • String type = Fetcher.type;*
  • if(typeAttributename != null &&
    document.getData().get(typeAttributename) != null){*
  • type = document.getData().get(typeAttributename).toString();*
  • type = type.toLowerCase();*
  • }*
  • IndexRequestBuilder rs =
    client.prepareIndex().setIndex(index).setType(type);*
  • rs.setTimeout(new TimeValue(10000));*
  • rs.setSource(document.getData());*
  • rs.execute().actionGet();*
  • } catch (Exception e) {*
  • e.printStackTrace();*
  • client.close();*
  • client = new TransportClient(settings).addTransportAddress(new
    InetSocketTransportAddress(server, port));*
  • index(document);*
  • } *
  • }*
  • public void close(){*
  • client.close();*
  • }*
    }

in ~20 threads I run

Fetcher.getInstace().index(document);

I've created my own tokenizer filter that is quite slow so I'm getting

Feb 17, 2014 9:53:51 AM org.elasticsearch.client.transport
INFO: [Killer] failed to get node info for
[#transport#-1][inet[localhost/127.0.0.1:9300]], disconnecting...
org.elasticsearch.transport.ReceiveTimeoutTransportException:
[][inet[localhost/127.0.0.1:9300]][cluster/nodes/info] request_id [2899]
timed out after [5001ms]
at
org.elasticsearch.transport.TransportService$TimeoutHandler.run(TransportService.java:351)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

org.elasticsearch.client.transport.NoNodeAvailableException: No node
available
at
org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:249)
at
org.elasticsearch.action.TransportActionNodeProxy$1.handleException(TransportActionNodeProxy.java:84)
at
org.elasticsearch.transport.TransportService$Adapter$2$1.run(TransportService.java:311)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

It seems that
rs.setTimeout(new TimeValue(10000));
in my index method doesn't work.

How can I setup timeout for indexing using API?

Is it correct to use one TransportCilent for multiple(10-60) threads?

Thanks
Petr

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/f5f15b57-955c-4fcf-b225-3974e37e447b%40googlegroups.com
.
For more options, visit https://groups.google.com/groups/opt_out.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CALY%3DcQCea8t7Mr3fFgpJQhWANxVttbA2KGh4qtGiRAq9TUimXw%40mail.gmail.com.
For more options, visit https://groups.google.com/groups/opt_out.


(Jörg Prante) #4

Yes, the BulkProcessor is useful - the official link to the source is

https://github.com/elasticsearch/elasticsearch/blob/master/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java

Thanks Ivan for pointing to my Javadoc but I think it is better to
reference the source :wink:

Petr, what ES cluster is this, how many nodes, how much heap?

You should carefully design your cluster and your indexing process before
putting some indexing load on it - or you must travel down the bumpy road
and learn for yourself and fix all kinds of issues to get it run smoothly.

It is ok to use TranspotrClient in a single instance but not like you do in
the catch clause.

Also it is bad practice to drop the IndexResponse by ...
execute().actionGet(). Because of the async nature of the API, you are
sending far too many requests one after another. Please evaluate the
responses, and continue only if limits are not exceeded and there is no
error in a response - there are no exceptions thrown.

Jörg

On Mon, Feb 17, 2014 at 5:58 PM, Ivan Brusic ivan@brusic.com wrote:

You are overwhelming the elasticsearch server. Instead of playing around
with the timeout settings and the number of threads, consider using the
Bulk API:
http://www.elasticsearch.org/guide/en/elasticsearch/client/java-api/current/bulk.html

The bulk processor class is extremely useful:
http://xbib.org/elasticsearch/1.0.0.Beta2-SNAPSHOT/apidocs/org/elasticsearch/action/bulk/BulkProcessor.html

--
Ivan

On Mon, Feb 17, 2014 at 1:04 AM, Petr Janský petr.jansky@6hats.cz wrote:

Hello,

I'm trying to index >300k docs using Java API.

public class Fetcher {

  • public static String server = "localhost"; *
  • public static Integer port = 9300;*
  • public static String index = "default";*
  • public static String type = "default";*
  • public static String typeAttributename = null;*
  • static Client client = null;*
  • private static Fetcher inst;*
  • Settings settings = ImmutableSettings.settingsBuilder()*
  • .put("cluster.name http://cluster.name", "elasticsearch")*
  • .put("node.name http://node.name", "Killer")*
  • .build();*
  • public synchronized static Fetcher getInstace(){*
  • if(inst == null){*
  • inst = new Fetcher();*
  • }*
  • return inst;*
  • }*
  • public Fetcher() {*
  • client = new TransportClient(settings).addTransportAddress(new
    InetSocketTransportAddress(server, port));*
  • }*
  • public void index(DocumentVo document) {*
  • try {*
  • String type = Fetcher.type;*
  • if(typeAttributename != null &&
    document.getData().get(typeAttributename) != null){*
  • type = document.getData().get(typeAttributename).toString();*
  • type = type.toLowerCase();*
  • }*
  • IndexRequestBuilder rs =
    client.prepareIndex().setIndex(index).setType(type);*
  • rs.setTimeout(new TimeValue(10000));*
  • rs.setSource(document.getData());*
  • rs.execute().actionGet();*
  • } catch (Exception e) {*
  • e.printStackTrace();*
  • client.close();*
  • client = new TransportClient(settings).addTransportAddress(new
    InetSocketTransportAddress(server, port));*
  • index(document);*
  • } *
  • }*
  • public void close(){*
  • client.close();*
  • }*
    }

in ~20 threads I run

Fetcher.getInstace().index(document);

I've created my own tokenizer filter that is quite slow so I'm getting

Feb 17, 2014 9:53:51 AM org.elasticsearch.client.transport
INFO: [Killer] failed to get node info for
[#transport#-1][inet[localhost/127.0.0.1:9300]], disconnecting...
org.elasticsearch.transport.ReceiveTimeoutTransportException:
[][inet[localhost/127.0.0.1:9300]][cluster/nodes/info] request_id [2899]
timed out after [5001ms]
at
org.elasticsearch.transport.TransportService$TimeoutHandler.run(TransportService.java:351)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

org.elasticsearch.client.transport.NoNodeAvailableException: No node
available
at
org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:249)
at
org.elasticsearch.action.TransportActionNodeProxy$1.handleException(TransportActionNodeProxy.java:84)
at
org.elasticsearch.transport.TransportService$Adapter$2$1.run(TransportService.java:311)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

It seems that
rs.setTimeout(new TimeValue(10000));
in my index method doesn't work.

How can I setup timeout for indexing using API?

Is it correct to use one TransportCilent for multiple(10-60) threads?

Thanks
Petr

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/f5f15b57-955c-4fcf-b225-3974e37e447b%40googlegroups.com
.
For more options, visit https://groups.google.com/groups/opt_out.

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/CALY%3DcQCea8t7Mr3fFgpJQhWANxVttbA2KGh4qtGiRAq9TUimXw%40mail.gmail.com
.

For more options, visit https://groups.google.com/groups/opt_out.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAKdsXoEsVSU09jeJCtxTwagAAf3V%2BWAj-EjuOugkyZDd8AOADA%40mail.gmail.com.
For more options, visit https://groups.google.com/groups/opt_out.


(Ivan Brusic) #5

I respectively disagree. :slight_smile:

In object oriented programming, you code to the interface, not the
implementation. Then again, most people should be using code aware IDEs,
which makes code lookup even easier.

Judging by his settings, I am assuming he is using a single local instance.
Before getting into further design, using bulk indexing should be his
baseline to measure by.

Cheers,

Ivan

On Mon, Feb 17, 2014 at 9:17 AM, joergprante@gmail.com <
joergprante@gmail.com> wrote:

Yes, the BulkProcessor is useful - the official link to the source is

https://github.com/elasticsearch/elasticsearch/blob/master/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java

Thanks Ivan for pointing to my Javadoc but I think it is better to
reference the source :wink:

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CALY%3DcQA3KndkCaxpvnQ%3DZKCsO-UM%2BjAY-OGA%2BW7Ct0zo7TS2OA%40mail.gmail.com.
For more options, visit https://groups.google.com/groups/opt_out.


(system) #6