We have been trying to push the data to Elasticsearch cluster by using the following method in spark "JavaEsSpark.saveToEs"
I have defined the "rd_innovation_config1" index mappings and settings in ES cluster and below is the code snipet of JavaRDD to save the document in Elasticsearch
Since InnovationConfig.builder() has all the pojo's and in which each has set @jsonproperty as like below,
package com.ogrds.datamodel.es.rdinnovationconfig;
import java.io.Serializable;
import com.fasterxml.jackson.annotation.JsonClassDescription;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.modeliosoft.modelio.javadesigner.annotations.objid;
import lombok.Builder;
import lombok.Getter;
@objid ("0dcc4051-9213-435e-88da-50c7d6433b8f")
@JsonClassDescription("rd_innovation_config")
@Builder
@Getter
public class InnovationConfig implements Serializable {
@objid ("99e5975b-401e-4aa6-8736-26e21348e4b5")
@JsonProperty("ACTIVE_ID")
public String activeID;
@objid ("b71d26fd-4203-4ffc-a93a-ceef95aaf91e")
@JsonProperty("CHR_ID")
public String chrID;
@objid ("2fba5712-ff54-4375-a3a2-e94a8a9dbf48")
@JsonProperty("CHR_VAL_ID")
public String chrValID;
@objid ("ef742f1e-ff29-42a1-afca-172049e87b9e")
@JsonProperty("ISO_COUNTRY_CODE")
public String isoCntryCode;
@objid ("f402da18-5123-43c2-b93f-86da44cd787a")
@JsonProperty("PROCESS_TYPE")
public String processType;
@objid ("e2b7beb8-119c-4663-a98d-2055a62c70e7")
@JsonProperty("RELEASE_IND")
public String releaseInd;
}
But after saveToES operation done the values are not saved to the right field in the pre-defined mapping and seems @jsonproperty are not able to serialize properly???
Yes @rulanitee I already created the index with mappings before pushing the data to ES but after pushing the data to ES the values are not mapped to the existing fields.
Below is the mapping of rd_innovation_config1 index,
You can see clearly there will be two sets of fields are mapped to the same index,
Manually created fields at the top (i.e UPPERCASE)
Automatically created fields at the bottom (i.e LOWERCASE)
Oh i see now. The json annotations are being ignored. Quick question, do you want to save to ES as json or RDD?
As i see it, you want:
to be serialized using the json annotation you specified.
If that is the case, why not convert InnovationConfig to a json string and push to ES the way you expect the data to be. (JavaEsSpark.saveJsonToEs(your_json_object)) as documented.
If I am not mistaken:
it is being serialized as is, as per the documentation.
@JsonClassDescription("rd_innovation_config")
public class InnovationConfig {
@JsonProperty("ACTIVE_ID")
public String activeID;
@JsonProperty("CHR_ID")
public String chrID;
@JsonProperty("CHR_VAL_ID")
public String chrValID;
@JsonProperty("ISO_COUNTRY_CODE")
public String isoCntryCode;
@JsonProperty("PROCESS_TYPE")
public String processType;
@JsonProperty("RELEASE_IND")
public String releaseInd;
}
public final class JavaSpark {
public static void main (String[] args) throws Exception{
SparkConf conf = new SparkConf().setAppName("EsJavaClient").setMaster("local");
conf.set("es.index.auto.create", "false");
// and the rest of your settings
JavaSparkContext sc = new JavaSparkContext(conf);
InnovationConfig innovationConfig = new InnovationConfig();
innovationConfig.activeID = "0";
innovationConfig.chrID = "33986";
innovationConfig.chrValID = "72291112";
innovationConfig.isoCntryCode = "7";
innovationConfig.processType = "INITIATIVE";
innovationConfig.releaseInd = "14733550531568";
InnovationConfig innovationConfig1 = new InnovationConfig();
innovationConfig1.activeID = "1";
innovationConfig1.chrID = "4444444";
innovationConfig1.chrValID = "5555555";
innovationConfig1.isoCntryCode = "750";
innovationConfig1.processType = "ONE_TESTING";
innovationConfig1.releaseInd = "14733550531568";
ObjectMapper mapper = new ObjectMapper(); // USING JACKSON, WE CONVERT OUR OBJECT TO JSON
String innovationConfigJsonString = mapper.writeValueAsString(innovationConfig);
String innovationConfig1JsonString = mapper.writeValueAsString(innovationConfig1);
JavaRDD<String> stringRDD = sc.parallelize(ImmutableList.of(innovationConfigJsonString, innovationConfig1JsonString)); // WE ADD THE JSON STRINGS TO AN IMMUTABLE LIST
JavaEsSpark.saveJsonToEs(stringRDD,"rd_innovation_config1/innovation_config"); // PUSH TO ES
}
@r.ganeshbabu ES-Hadoop does not support your JSON annotations on the fields. Instead, it only respects the names of the getter fields for the java bean. The names from those methods are converted to the implied property names and formatted in lowerCamelCase style when converting the bean into JSON. If you want to use these annotations, I would suggest calling the .map() method on the RDD and within it converting the bean into JSON using which ever object mappers you prefer, and then saving the corresponding JSON to Elasticsearch using es.json.input = true property.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.