Hello Folks,
I am trying to insert logs that I extract from a kafka server in order to insert in ElasticSearc 5.0.0 with Spark Streaming 2.0.0 .
Here is my code. My big problem is with line "saveJsonToES", in fact, this function has a string argument for specifying the index name. However, my index name is a JavaDStream. I did like this in oder to generate dynamic index names in another class.
JavaDStream<List<String>> newLines = lines.map(arg0 -> {
String lineToInsertInES = "";
String indexName = "";
List<String> list = new ArrayList<String>();
//some code to determine strings to add in my list
list.add(lineToInsertInES);
list.add(indexName);
return list;
});
JavaDStream<String> lineToInsertInES = newLines.map(list -> list.get(0));
JavaDStream<String> indexName = newLines.map(list -> list.get(1));
lineToInsertInES.foreachRDD(line->{
if(!line.isEmpty())
JavaEsSpark.saveJsonToEs(line,indexName); //problem at this line
});
Can you telle how I can solve this ?
Thank you in advance
Y