How to update about JavaEsSpark.saveToEs

Issue description

Description

My test ES data:
#! Deprecation: query malformed, empty clause found at [2:13]
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 1,
"max_score": 1,
"hits": [
{
"_index": "face_imsi_test",
"_type": "face_imsi_test",
"_id": "123",
"_score": 1,
"_source": {
"img_id": "11",
"img_url": "11",
"pid": "123",
"imsis": [
{
"imsi114": "10"
}
],
"update_time": "2017-10-12 19:49:04"
}
}
]
}
}

if My new data is "pid:123,imsis{imsi113:10}" I want update or not replace
I want data as follow:
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 1,
"max_score": 1,
"hits": [
{
"_index": "face_imsi_test",
"_type": "face_imsi_test",
"_id": "123",
"_score": 1,
"_source": {
"img_id": "11",
"img_url": "11",
"pid": "123",
"imsis": [
{
"imsi114": "10",
"imsi113": "10"
}
],
"update_time": "2017-10-12 19:49:04"
}
}
]
}
}

if My new data is "pid:123,imsis{imsi114:20}" I want update or not replace
I want data as follow:
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 1,
"max_score": 1,
"hits": [
{
"_index": "face_imsi_test",
"_type": "face_imsi_test",
"_id": "123",
"_score": 1,
"_source": {
"img_id": "11",
"img_url": "11",
"pid": "123",
"imsis": [
{
"imsi114": "20",
"imsi113": "10"
}
],
"update_time": "2017-10-12 19:49:04"
}
}
]
}
}

but now it's replace : My new data is "pid:123,imsis{imsi113:10}"
the result as follow
#! Deprecation: query malformed, empty clause found at [2:13]
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 1,
"max_score": 1,
"hits": [
{
"_index": "face_imsi_test",
"_type": "face_imsi_test",
"_id": "123",
"_score": 1,
"_source": {
"img_id": "11",
"img_url": "11",
"pid": "123",
"imsis": [
{
"imsi113": "10"
}
],
"update_time": "2017-10-12 19:49:04"
}
}
]
}
}

ES mapping:

PUT face_imsi
{
"mappings": {
"face_imsi": {
"properties": {
"pid": {
"type": "keyword"
},
"img_url": {
"type": "keyword"
},
"img_id": {
"type": "keyword"
},
"update_time": {
"type" : "date",
"doc_values" : true,
"format" : "yyyy-MM-dd HH:mm:ss"
},
"imsis": {
"type": "nested",
"properties": {
"imsi": { "type": "keyword" },
"match_times": { "type": "integer" }
}
}
}
}
}
}

Code:

SparkConf conf = new SparkConf();
conf.set("es.nodes", "192.168.1.228");
conf.set("es.port", "9200");
if (Shell.WINDOWS) {
conf.setMaster("local");
conf.setAppName(FaceAndImsiTest.class.getName());
}
SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").config(conf).config("es.write.operation", "upsert").getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
List<Map<String, Object>> imsis = new LinkedList<>();
Map<String, Object> imsi = new HashMap<>();
imsi.put("imsi113","10");
imsis.add(imsi);

List<MatchImsiPo> lm = new LinkedList<>();
MatchImsiPo mp = new MatchImsiPo("123","11","11","2017-10-12 19:49:04",imsis);
lm.add(mp);
JavaRDD<MatchImsiPo> esRdd = sc.parallelize(lm);

Map<String, String> vmId = new HashMap<>();
vmId.put("es.mapping.id", "pid");
JavaEsSpark.saveToEs(esRdd, "face_imsi_test/face_imsi_test",vmId);

Version Info

OS: : centos 6.5
JVM :
Hadoop/Spark: 2.6.0/2.1.0
ES-Hadoop :
ES : 5.5.0
elasticsearch-spark-20_2.11 : 5.5.0

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.