Duplicates result with elasticsearch hadoop spark

Elasticsearch cluster: 5.2.2
es-hadoop: 5.2.2 java

<dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch-spark-20_2.11</artifactId>
      <version>5.2.2</version>
    </dependency>
    <dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch</artifactId>
      <version>5.2.2</version>
    </dependency>
    <dependency>
      <groupId>org.elasticsearch.client</groupId>
      <artifactId>transport</artifactId>
      <version>5.2.2</version>
    </dependency>

I have a query built by elasticsearch dsl (using QueryBuilder), if I print that query and run directly in elasticsearch dev tool, it shows me 200M result. But if I run it through es-hadoop spark and save them on s3, I got duplicated records. One thing worth notice is: there are same number of records exported to s3 (which means some duplicates replaced some other records, I verified this).

Here are my code snippet, there's no transformation I just dump the records to s3 directly:

JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(sc, "index_name/type_name", "query_str");
esRDD.saveAsTextFile("a_path_to_s3");

In the result data on s3, I find duplicated rows, indicated by same _id, which I set explicitly in elasticsearch (so not using auto generated _id).

Here is my query_str

{
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "integer_field_1": {
              "from": 15,
              "to": null,
              "include_lower": true,
              "include_upper": true,
              "boost": 1.0
            }
          }
        }
      ],
      "should": [
        {
          "range": {
            "date_field_1": {
              "from": "now-100d/d",
              "to": null,
              "include_lower": false,
              "include_upper": true,
              "boost": 1.0
            }
          }
        },
        {
          "bool": {
            "must": [
              {
                "range": {
                  "date_field_1": {
                    "from": "now-300d/d",
                    "to": null,
                    "include_lower": false,
                    "include_upper": true,
                    "boost": 1.0
                  }
                }
              },
              {
                "range": {
                  "integer_field_2": {
                    "from": 500,
                    "to": null,
                    "include_lower": false,
                    "include_upper": true,
                    "boost": 1.0
                  }
                }
              }
            ],
            "disable_coord": false,
            "adjust_pure_negative": true,
            "boost": 1.0
          }
        },
        {
          "range": {
            "integer_field_2": {
              "from": 2000,
              "to": null,
              "include_lower": false,
              "include_upper": true,
              "boost": 1.0
            }
          }
        }
      ],
      "disable_coord": false,
      "adjust_pure_negative": true,
      "minimum_should_match": "1",
      "boost": 1.0
    }
  },
  "_source": {
    "includes": [
      "some_field_1",
      "some_field_2"
    ],
    "excludes": []
  }
}

Any helps are appreciated!

I've tried with Dataset<Row> and I got duplicate as well. Here's my Dataset code:

Dataset<Row> df = sqlContext.read().format("es").load("index_name/type_name");
df.write().mode(SaveMode.Overwrite).csv("a_path_to_s3");

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.