Facing too many file open or how to hit ES multiple times using spark


(Gunjit Bansal) #1

Hi,

My use case is to hit ES multiple times using Spark. In this, i am hitting ES 10000 times. I am getting too many files exception. But its working fine for 5K request

To achieve this, I am hitting through REST API.

I am using Spark 1.6. It is a 4 node cluster having each node 30GB ram and 8 core.

The ulimit is 1,000,000 for all the users. Also for this code why it is opening these many files whereas for other jobs, it is running fine.

Code:

ES.java

package testinges;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import utils.MapperUtil;

import java.io.IOException;

public class ES {

    public static boolean query(String json_data) throws IOException {
        String url = "http://localhost:9200/sampleindex/_search";
        HttpClient client = HttpClientBuilder.create().build();
        HttpPost post_request = new HttpPost(url);

        StringEntity json_to_post = new StringEntity(json_data);
        post_request.setEntity(json_to_post);
        post_request.addHeader("Accept", "application/json");

        HttpResponse response = client.execute(post_request);

        HttpEntity entity = response.getEntity();
        if (entity != null) {
            String jsonString = EntityUtils.toString(entity, "UTF-8");
            JsonNode jsonNodeFromString = MapperUtil.getJsonNodeFromString(jsonString);
            if(jsonNodeFromString.has("hits")){
                String hitCount = jsonNodeFromString.get("hits").get("total").toString();
                int hit = Integer.parseInt(hitCount);
                if(hit>0) return true;
            }

        }


        return false;
    }
}

ESTest.scala

package testinges
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions.udf

object ESTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .set("es.index.auto.create", "true")
      .set("es.nodes.wan.only", "true")
      .set("es.nodes", "localhost")
      .set("es.port", "9200")
      .set("es.write.operation", "upsert")
      .setAppName("ESTest")
      //.setMaster("local[*]")
    val sparkContext = new org.apache.spark.SparkContext(conf)
    val sqlContext = new SQLContext(sparkContext)
    def esQueryBuilder(field : String, fieldValue : String) : String= {
      val query =
        """{
            "query": {
                "match_phrase": {
                   """"+field+"\": \""+fieldValue+""""
                }
            }
        }"""
      query
    }

    val field2pass = "field1"

    val testUdf = udf{
      (value : String)=>{
        if(value!=null) {
          val queryString = esQueryBuilder(field2pass, value)
          var op= ES.query(queryString)
          op
        }else false
      }
    }

    val inputPath = args(0)
    var data = sqlContext.read.text(inputPath)
    println(data.rdd.partitions.length)
    println(data.count())

    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS")
    println(sdf.format(new Date()))
    var op = data.withColumn("Flag",testUdf(data("value")))
    op.show(false)
    println(op.where("Flag=false").count())
    println(sdf.format(new Date()))
    op.where("Flag=false").show(false)
    op.printSchema()
  }

}

(David Pilato) #2

Please don't post images of text as they are hardly readable and not searchable.

Instead paste the text and format it with </> icon. Check the preview window.


(Gunjit Bansal) #3

Hi , Please check now


(David Pilato) #4

Do you have logs?


(Gunjit Bansal) #5
Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage 3.0 (TID 8, hadoop4-dev.localhost.com): java.net.SocketException: Too many open files
	at java.net.Socket.createImpl(Socket.java:460)
	at java.net.Socket.getImpl(Socket.java:520)
	at java.net.Socket.setReuseAddress(Socket.java:1449)
	at org.apache.http.impl.conn.HttpClientConnectionOperator.connect(HttpClientConnectionOperator.java:109)
	at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:314)
	at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:363)
	at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:219)
	at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:195)
	at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:85)
	at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:108)
	at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:186)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:106)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
	at testinges.ES.query(ES.java:26)
	at testinges.ESTest$$anonfun$1.apply(ESTest.scala:38)
	at testinges.ESTest$$anonfun$1.apply(ESTest.scala:35)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
	at org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74)
	at org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72)
	at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:88)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:717)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:717)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

(David Pilato) #6

I meant elasticsearch logs


(Gunjit Bansal) #7

Ok, I will share the elastic search logs shortly.
Just want to know is it a right approach to use ES with spark??


(system) #8

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.