JavaEsSpark.saveToES not using pre-defined mapping fields while posting the data to ES cluster

Hello All,

We have been trying to push the data to Elasticsearch cluster by using the following method in spark "JavaEsSpark.saveToEs"

I have defined the "rd_innovation_config1" index mappings and settings in ES cluster and below is the code snipet of JavaRDD to save the document in Elasticsearch

	JavaRDD<InnovationConfig> iConfigRDD = initParquetDf.javaRDD().map(y ->   InnovationConfig.builder()
								            .isoCntryCode(y.getString(0))
		                        			.chrID(y.getString(1))
		                        			.chrValID(y.getString(2))
		                        			.processType(y.getString(4))
		                        			.activeID(y.getString(5))
		                        			.releaseInd(String.valueOf(y.getLong(6)))
		                        			.build());
			
    JavaEsSpark.saveToEs(iConfigRDD, "rd_innovation_config1/innovation_config");

Since InnovationConfig.builder() has all the pojo's and in which each has set @jsonproperty as like below,

package com.ogrds.datamodel.es.rdinnovationconfig;

import java.io.Serializable;
import com.fasterxml.jackson.annotation.JsonClassDescription;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.modeliosoft.modelio.javadesigner.annotations.objid;
import lombok.Builder;
import lombok.Getter;

@objid ("0dcc4051-9213-435e-88da-50c7d6433b8f")
@JsonClassDescription("rd_innovation_config")
@Builder
@Getter
public class InnovationConfig implements Serializable {
    @objid ("99e5975b-401e-4aa6-8736-26e21348e4b5")
    @JsonProperty("ACTIVE_ID")
    public String activeID;

    @objid ("b71d26fd-4203-4ffc-a93a-ceef95aaf91e")
    @JsonProperty("CHR_ID")
    public String chrID;

    @objid ("2fba5712-ff54-4375-a3a2-e94a8a9dbf48")
    @JsonProperty("CHR_VAL_ID")
    public String chrValID;

    @objid ("ef742f1e-ff29-42a1-afca-172049e87b9e")
    @JsonProperty("ISO_COUNTRY_CODE")
    public String isoCntryCode;

    @objid ("f402da18-5123-43c2-b93f-86da44cd787a")
    @JsonProperty("PROCESS_TYPE")
    public String processType;

    @objid ("e2b7beb8-119c-4663-a98d-2055a62c70e7")
    @JsonProperty("RELEASE_IND")
    public String releaseInd;
}

But after saveToES operation done the values are not saved to the right field in the pre-defined mapping and seems @jsonproperty are not able to serialize properly???

Please correct me if I am doing anything wrong and let me know your thoughts..

Thanks,
Ganeshbabu R

Hi Ganeshbabu

Did you set es.index.auto.create to false in your spark config as follows:

sparkConf.set("es.index.auto.create", "false") -> that way it does not create the index from InnovationConfig.builder().

Regards

Hi @rulanitee

No, I have given like this in sparkconf.set("es.index.auto.create", "true")

Regards,
Ganeshbabu R

Hi,

If you already created the index and have done the mappings, then you can set that config to false.

Regards

Yes @rulanitee I already created the index with mappings before pushing the data to ES but after pushing the data to ES the values are not mapped to the existing fields.

Below is the mapping of rd_innovation_config1 index,

You can see clearly there will be two sets of fields are mapped to the same index,

  1. Manually created fields at the top (i.e UPPERCASE)
  2. Automatically created fields at the bottom (i.e LOWERCASE)

Please let me know your thoughts on this...

Regards,
Ganeshbabu R

Hi,

Oh i see now. The json annotations are being ignored. Quick question, do you want to save to ES as json or RDD?

As i see it, you want:

to be serialized using the json annotation you specified.

If that is the case, why not convert InnovationConfig to a json string and push to ES the way you expect the data to be. (JavaEsSpark.saveJsonToEs(your_json_object)) as documented.

If I am not mistaken:

it is being serialized as is, as per the documentation.

Hope it helps ...

Regards

Yes, I want to save as json in ES..

@rulanitee Could you please share some sample code how to get json string from the RDD?

As I am new to this technology and I stuck in getting the json strings from the RDD.

Please let me know your feedback

Thanks,
Ganeshbabu R

Hi,

Sorry for getting back to you late. Here is a quick and dirty way of sending data as json:

import com.fasterxml.jackson.annotation.JsonClassDescription;
import com.fasterxml.jackson.annotation.JsonProperty;

@JsonClassDescription("rd_innovation_config")
public class InnovationConfig {

@JsonProperty("ACTIVE_ID")
public String activeID;


@JsonProperty("CHR_ID")
public String chrID;


@JsonProperty("CHR_VAL_ID")
public String chrValID;


@JsonProperty("ISO_COUNTRY_CODE")
public String isoCntryCode;


@JsonProperty("PROCESS_TYPE")
public String processType;


@JsonProperty("RELEASE_IND")
public String releaseInd;

}

public final class JavaSpark {

public static void main (String[] args) throws Exception{
    SparkConf conf = new SparkConf().setAppName("EsJavaClient").setMaster("local");
    conf.set("es.index.auto.create", "false");        
    // and the rest of your settings
    
    JavaSparkContext sc = new JavaSparkContext(conf);

    InnovationConfig innovationConfig = new InnovationConfig();
    innovationConfig.activeID = "0";
    innovationConfig.chrID = "33986";
    innovationConfig.chrValID = "72291112";
    innovationConfig.isoCntryCode = "7";
    innovationConfig.processType = "INITIATIVE";
    innovationConfig.releaseInd = "14733550531568";

    InnovationConfig innovationConfig1 = new InnovationConfig();
    innovationConfig1.activeID = "1";
    innovationConfig1.chrID = "4444444";
    innovationConfig1.chrValID = "5555555";
    innovationConfig1.isoCntryCode = "750";
    innovationConfig1.processType = "ONE_TESTING";
    innovationConfig1.releaseInd = "14733550531568";

    ObjectMapper mapper = new ObjectMapper(); // USING JACKSON, WE CONVERT OUR OBJECT TO JSON
    String innovationConfigJsonString = mapper.writeValueAsString(innovationConfig);
    String innovationConfig1JsonString = mapper.writeValueAsString(innovationConfig1);

    JavaRDD<String> stringRDD = sc.parallelize(ImmutableList.of(innovationConfigJsonString, innovationConfig1JsonString)); // WE ADD THE JSON STRINGS TO AN IMMUTABLE LIST
    JavaEsSpark.saveJsonToEs(stringRDD,"rd_innovation_config1/innovation_config"); // PUSH TO ES
}

}

And the result is as follows:

Hope this will set you in the right direction, let me know how it goes ...

Regards

@r.ganeshbabu ES-Hadoop does not support your JSON annotations on the fields. Instead, it only respects the names of the getter fields for the java bean. The names from those methods are converted to the implied property names and formatted in lowerCamelCase style when converting the bean into JSON. If you want to use these annotations, I would suggest calling the .map() method on the RDD and within it converting the bean into JSON using which ever object mappers you prefer, and then saving the corresponding JSON to Elasticsearch using es.json.input = true property.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.