Hi,
My use case is to hit ES multiple times using Spark. In this, i am hitting ES 10000 times. I am getting too many files exception. But its working fine for 5K request
To achieve this, I am hitting through REST API.
I am using Spark 1.6. It is a 4 node cluster having each node 30GB ram and 8 core.
The ulimit is 1,000,000 for all the users. Also for this code why it is opening these many files whereas for other jobs, it is running fine.
Code:
ES.java
package testinges;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import utils.MapperUtil;
import java.io.IOException;
public class ES {
public static boolean query(String json_data) throws IOException {
String url = "http://localhost:9200/sampleindex/_search";
HttpClient client = HttpClientBuilder.create().build();
HttpPost post_request = new HttpPost(url);
StringEntity json_to_post = new StringEntity(json_data);
post_request.setEntity(json_to_post);
post_request.addHeader("Accept", "application/json");
HttpResponse response = client.execute(post_request);
HttpEntity entity = response.getEntity();
if (entity != null) {
String jsonString = EntityUtils.toString(entity, "UTF-8");
JsonNode jsonNodeFromString = MapperUtil.getJsonNodeFromString(jsonString);
if(jsonNodeFromString.has("hits")){
String hitCount = jsonNodeFromString.get("hits").get("total").toString();
int hit = Integer.parseInt(hitCount);
if(hit>0) return true;
}
}
return false;
}
}
ESTest.scala
package testinges
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions.udf
object ESTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.set("es.index.auto.create", "true")
.set("es.nodes.wan.only", "true")
.set("es.nodes", "localhost")
.set("es.port", "9200")
.set("es.write.operation", "upsert")
.setAppName("ESTest")
//.setMaster("local[*]")
val sparkContext = new org.apache.spark.SparkContext(conf)
val sqlContext = new SQLContext(sparkContext)
def esQueryBuilder(field : String, fieldValue : String) : String= {
val query =
"""{
"query": {
"match_phrase": {
""""+field+"\": \""+fieldValue+""""
}
}
}"""
query
}
val field2pass = "field1"
val testUdf = udf{
(value : String)=>{
if(value!=null) {
val queryString = esQueryBuilder(field2pass, value)
var op= ES.query(queryString)
op
}else false
}
}
val inputPath = args(0)
var data = sqlContext.read.text(inputPath)
println(data.rdd.partitions.length)
println(data.count())
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS")
println(sdf.format(new Date()))
var op = data.withColumn("Flag",testUdf(data("value")))
op.show(false)
println(op.where("Flag=false").count())
println(sdf.format(new Date()))
op.where("Flag=false").show(false)
op.printSchema()
}
}