Elasticsearch ports using ES-Hadoop

Hi Costin,

I am trying to run a spark application on the production mode with 3 node spark cluster and 3 node ES cluster with 32 partition. Following is the ES-hadoop configuration while doing a simple search:

val defaultSparkConfig = Map(
"es.nodes" -> "10.xxx.xx.xx",
"es.port" -> "9201",
"es.scroll.size" -> "10000",
"es.resource.from" -> "cortez/data",
"es.resource.to" -> "cortez/newdata",
"es.batch.size.bytes" -> "500M",
"es.batch.size.entries" -> "20000",
"es.clusterName" -> "xxxxxxxx"
)
Please note that 9201 is set in our elasticsearch.yml as our http.port. During the course of action, there are traces of log where RecieveTimeOutTransportException occurs. Please note that this is not for all the tasks(partitions) and it happens randomly for certains tasks. The log says that timeout exception occurred at port 9301(our transport.tcp.port). Why is the connector trying to hit ES on port 9301 here? Any help would be appreciated. Here is the full log:

org.elasticsearch.transport.ReceiveTimeoutTransportException: [es_prod_********1][inet[/10.xxx.xx.xx:9301]][cluster/state] request_id [564] timed out after [5034ms]
at org.elasticsearch.transport.TransportService$TimeoutHandler.run(TransportService.java:370)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)

What makes you think this is an es-connector issue? The stack-trace doesn't contain any es-connector classes but rather indicates it's the typical node-to-node communication.

Also your configuration has some issue:

  • there's no es.resource.from or es.resource.to
  • your batch and scroll size are way too big and likely the cause of your issues. There's a reason the defaults are as they are - increasing them 10x, 100x or higher will not gain performance quite the opposite.

This is explained in the reference docs so I recommend you go through them; here's a short explanation:

  • scroll.size of 10K docs means you are saving on cheap requests by forcing ES nodes to track 10K
  • 500M batch - each task will send half a gig to ES .Why? A batch size should be small enough to be processed fast, in 1-2s. Sending half a gig is bound to make the cluster unresponsive.
  • 20K entries - how big are you entries? If your'e dealing with 100K per doc you are looking at 2Tb of data as a maximum limit.
    Again per task - 10 tasks means multiply that by 10.

Start with the defaults, measure performance and then see if you can improve it.

Thanks for the reply Costin. I know that the error is not because of the es connector. That's why I tried editing my post but it didn't help. :slight_smile: . Anyways you answered a lot of questions regarding the configurations which I tried asking in another post as well. Regarding es.resource.from and res.resource.to, these two properties are directly used while reading and writing while creating RDD's using EsSpark for the very reason reading is from different type and writing is to different type in the very same job and it works totally fine.

Regarding the error, nearly all the description on Google either point to a version mismatch between ES and ES java API or when there is some error in communication in between nodes. The error in communication in between nodes could be because of network issues or memory usage. Looks like memory usage here because of the configurations which I was using.

The reason for using configuration on such high end was the document count was huge and a single document size was very small(approx 40K each document). Updating with the default configuration was taking a lot of time due to the large document count, hence I tried with the bigger configuration. The question always revolve around the memory vs time. However, as you suggested, if the bigger configuration is causing ES to misbehave, it doesn't make sense to use it. I will try to get it to a level where everyone is happy happy.

Thanks for the help.
Piyush