How to change dynamically the index name in "saveJsonToES"?

Hello Folks,

I am trying to insert logs that I extract from a kafka server in order to insert in ElasticSearc 5.0.0 with Spark Streaming 2.0.0 .

Here is my code. My big problem is with line "saveJsonToES", in fact, this function has a string argument for specifying the index name. However, my index name is a JavaDStream. I did like this in oder to generate dynamic index names in another class.

JavaDStream<List<String>> newLines = lines.map(arg0 -> {    

    String lineToInsertInES = "";
    String indexName = "";
    List<String> list = new ArrayList<String>();

    //some code to determine strings to add in my list

    list.add(lineToInsertInES);
    list.add(indexName);
    return list;
});

JavaDStream<String> lineToInsertInES = newLines.map(list -> list.get(0));
JavaDStream<String> indexName = newLines.map(list -> list.get(1));

lineToInsertInES.foreachRDD(line->{

    if(!line.isEmpty())
        JavaEsSpark.saveJsonToEs(line,indexName); //problem at this line
});

Can you telle how I can solve this ?

Thank you in advance

Y

Sounds more like a Spark than an Elasticsearch question to me. What prevents you from converting the JavaDStream to a regular String before passing it as an argument?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.