Elastic Search Index Mapping

NEED HELP......
I am trying to add a an array of dictionaries as a field in existing elastic search cluster. Previously my data looked like this:

  "businessBureauId": "3699",
  "businessRecordId": "3699",
  "businessRecordSource": "gdgtr",
  "businessClusterId": "3699",
  "businessGeoLocation": "44.05719,-79.37828",
  "applicationId": null,
  "enterpriseCustomerId": null,
  "businessCoreFields": {
    "businessName": "wrgrg",
    "cleanBusinessName": "trhaha",
    "cleanBusinessNameTrigram": ["__m", "_mm", "mm_", "__v", "_vi"],
    "cleanBusinessNameSoundex": "M124",
    "isFranchise": 0,
    "zipLog": 1.0,
    "businessTelephoneNumber": "0000000000",
    "businessAddress": {
      "addressLine1": "ehateya",
      "city": "lancaster",
      "stateCode": "PA",
      "postalCode": "17601",
      "countryCode": "USA",
      "isAddressActive": true,
      "isAddressVacant": false,
      "isAddressFormatStandardized": true,
      "addressCategory": "Residential",
      "isCommercialMailReceivingAgency": false,
      "latitude": "44.05719",
      "longitude": "-79.37828",
      "addressType": "primary address",
      "addressId": "erteraehaj"
    },
    "businessContacts": [
      {
        "fullName": "mi",
        "address": {
          "addressLine1": null,
          "city": null,
          "stateCode": null,
          "postalCode": null,
          "countryCode": null,
          "isAddressActive": true,
          "isAddressVacant": false,
          "isAddressFormatStandardized": true,
          "addressCategory": null,
          "isCommercialMailReceivingAgency": false,
          "latitude": null,
          "longitude": null,
          "addressType": null,
          "addressId": null
        },
        "telephoneNumber": null,
        "emailAddress": null,
        "businessRole": "principal",
        "personId": "rthaetha"
      }
    ],
    "employeeCount":7,
    "formationYear":2018,
    "sic4Code":"4444",
    "sic4CodeDescription":"nonclassifiable establishments",
    "naics6Code":"444444",
    "naics6CodeDescription":"Unclassified Establishments",
    "annualRevenue":56789,
    "annualRevenueRange":"",
    "businessReportDate":"2021-01-17",
    "lastUpdateDate":"20210630"
  },
  "businessExtendedFields": {
    "businessAddresses": [
      {
        "addressLine1":"aehjZJ",
        "city":"lancaster",
        "stateCode":"PA",
        "postalCode":"17601",
        "countryCode":"USA",
        "isAddressActive":true,
        "isAddressVacant":false,
        "isAddressFormatStandardized": true,
        "addressCategory": "Residential",
        "isCommercialMailReceivingAgency": false,
        "latitude": "44.05719",
        "longitude": "-79.37828",
        "addressType": "primary address",
        "addressId": "fgegege"
      }
    ]
  }
}

and the mappings looks like this where I have only defined mappings for 5 fields and I let elastic search to infer mappings for rest of the fields:
"mappings": 
       {
          "properties": {
            "businessBureauId": { "type": "keyword" },
            "businessGeoLocation": { "type": "geo_point" },
            "businessCoreFields.businessName": {
              "type": "text",
              "fields": {
                "soundex": {
                  "type": "text",
                  "analyzer": "soundex_analyzer"
                },
                "trigram": {
                  "type": "text",
                  "analyzer": "trigram_tokenizer_analyzer"
                }
              }
            },
            "businessCoreFields.businessAddress": { "type":  "nested" },
            "businessCoreFields.businessContacts": { "type":  "nested" }
          }
        }

Everything until now works fine.
But as soon as I add new field in above json called "businessExecutiveNameCoreFields"
The json starts looking like this:
{
  "businessBureauId": "3699",
  "businessRecordId": "3699",
  "businessRecordSource": "gdgtr",
  "businessClusterId": "3699",
  "businessGeoLocation": "44.05719,-79.37828",
  "applicationId": null,
  "enterpriseCustomerId": null,
  "businessCoreFields": {
    "businessName": "wrgrg",
    "cleanBusinessName": "trhaha",
    "cleanBusinessNameTrigram": ["__m", "_mm", "mm_", "__v", "_vi"],
    "cleanBusinessNameSoundex": "M124",
    "isFranchise": 0,
    "zipLog": 1.0,
    "businessTelephoneNumber": "0000000000",
    "businessAddress": {
      "addressLine1": "ehateya",
      "city": "lancaster",
      "stateCode": "PA",
      "postalCode": "17601",
      "countryCode": "USA",
      "isAddressActive": true,
      "isAddressVacant": false,
      "isAddressFormatStandardized": true,
      "addressCategory": "Residential",
      "isCommercialMailReceivingAgency": false,
      "latitude": "44.05719",
      "longitude": "-79.37828",
      "addressType": "primary address",
      "addressId": "erteraehaj"
    },
    "businessContacts": [
      {
        "fullName": "mi",
        "address": {
          "addressLine1": null,
          "city": null,
          "stateCode": null,
          "postalCode": null,
          "countryCode": null,
          "isAddressActive": true,
          "isAddressVacant": false,
          "isAddressFormatStandardized": true,
          "addressCategory": null,
          "isCommercialMailReceivingAgency": false,
          "latitude": null,
          "longitude": null,
          "addressType": null,
          "addressId": null
        },
        "telephoneNumber": null,
        "emailAddress": null,
        "businessRole": "principal",
        "personId": "rthaetha"
      }
    ],
    "employeeCount":7,
    "formationYear":2018,
    "sic4Code":"4444",
    "sic4CodeDescription":"nonclassifiable establishments",
    "naics6Code":"444444",
    "naics6CodeDescription":"Unclassified Establishments",
    "annualRevenue":56789,
    "annualRevenueRange":"",
    "businessReportDate":"2021-01-17",
    "lastUpdateDate":"20210630"
  },
  "businessExtendedFields": {
    "businessAddresses": [
      {
        "addressLine1":"aehjZJ",
        "city":"lancaster",
        "stateCode":"PA",
        "postalCode":"17601",
        "countryCode":"USA",
        "isAddressActive":true,
        "isAddressVacant":false,
        "isAddressFormatStandardized": true,
        "addressCategory": "Residential",
        "isCommercialMailReceivingAgency": false,
        "latitude": "44.05719",
        "longitude": "-79.37828",
        "addressType": "primary address",
        "addressId": "fgegege"
      }
    ]
  },
  "businessExecutiveNameCoreFields": {
    "businessExecName": [
      {
      "fullName": "michael",
      "fullNameTrigram": "__m;_mi;cha;_el",
      "fullNameSoundex": "M245"
      }
    ]
  }
}

Now when I try to load it in elastic search cluster with same mapping as above, it gives me the following error:
`java.lang.NullPointerException`

I have also tried to give explicit mapping but still gives me same error.

Can anyone help me resolve this issue?

Welcome!

Could you provide:

  • the full stacktrace
  • a reproduction script
  • the exact version of your cluster

Thanks

Script that I use to load data to ealstic search cluster

import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.requests.indexes.CreateIndexResponse
import com.sksamuel.elastic4s.{ElasticClient, RequestFailure, RequestSuccess}
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}

object ESClient {

  var esClient = createHttpClient(
    elasticConf.getString("es_host"),
    elasticConf.getInt("es_port")
  )

  def createHttpClient(dbHost: String, dbPort: Int): ElasticClient = {
    val uri = s"${dbHost}:${dbPort}"
    logger.info(s"Now creating ES client for  $uri")
    val esClient = ElasticClient(JavaClient(ElasticProperties(uri)))
    logger.info(s"ES Client created for $uri")
    esClient
  }

  def assignNewClient(dbHost: String, dbPort: Int): Unit ={
    esClient = createHttpClient(
      dbHost,
      dbPort
    )
  }
}

def createHttpClient(dbHost: String, dbPort: Int): ElasticClient = {
    val uri = s"${dbHost}:${dbPort}"
    logger.info(s"Now creating ES client for  $uri")
    val esClient = ElasticClient(JavaClient(ElasticProperties(uri)))
    logger.info(s"ES Client created for $uri")
    esClient
}

val realTimeIndex = """
        {
        "settings": {
            "analysis": {
              "tokenizer": {
                  "trigram_tokenizer": {
                    "type": "ngram",
                    "min_gram": 3,
                    "max_gram": 3,
                    "token_chars": [ "letter", "digit" ]
                  }
              },
              "analyzer": {
                  "trigram_tokenizer_analyzer": {
                    "type": "custom",
                    "char_filter": ["clean_char_filter"],
                    "filter":  ["lowercase", "unique"],
                    "tokenizer": "trigram_tokenizer"
                  },
                  "soundex_analyzer": {
                  "tokenizer": "standard",
                  "filter": ["my_soundex", "unique"]
                }
              },
              "char_filter": {
              "clean_char_filter": {
                "type": "mapping",
                "mappings": [
                  "\\u0020incorporated=>\\u0020",
                  "\\u0020corp=>\\u0020",
                  "\\u0020corporation=>\\u0020",
                  "\\u0020company=>\\u0020",
                  "\\u0020limited=>\\u0020"
                  ]
              }
              },
              "filter": {
                "my_soundex": {
                  "type": "phonetic",
                  "encoder": "soundex",
                  "replace": true
                }
              }
            }
        },
        "mappings": {
          "properties": {
            "businessBureauId": { "type": "keyword" },
            "businessGeoLocation": { "type": "geo_point" },
            "businessCoreFields.businessName": {
              "type": "text",
              "fields": {
                "soundex": {
                  "type": "text",
                  "analyzer": "soundex_analyzer"
                },
                "trigram": {
                  "type": "text",
                  "analyzer": "trigram_tokenizer_analyzer"
                }
              }
            },
            "businessCoreFields.businessAddress": { "type":  "nested" },
            "businessCoreFields.businessContacts": { "type":  "nested" }
          }
        }
      }          
    """.stripMargin

def createCompanyIndex(indexName: String, esUrl: String, esPort: Int): Unit = {
    var apollo_mapping = realTimeIndex
    logger.info(s"Creating ES Index $indexName on $esUrl:$esPort")

    val businessIndex = indexName
    try {
      val client = ESClient.createHttpClient(esUrl,esPort)
      if (!client.execute(indexExists(businessIndex)).await.result.isExists) {
        val resp = client.execute(createIndex(businessIndex).source(apollo_mapping)).await
        resp match {
          case failure: RequestFailure => logger.info(s"Failed  to create ES Index: ${failure.error}")
          case success: RequestSuccess[CreateIndexResponse] => logger.info(s"Index Created - status code is ${success.status}")
        }
      }
    }
    catch {
      case e: Exception => {
        logger.error(s"Exception Caught while creating index: (${e.getMessage}) : + ${e.printStackTrace()}")
        // rethrow exception
        throw (e)
      }
    }
}

def writeToES(bList: List, client: ElasticClient, businessIndex: String): Option[List] = {
    try {
      var errorIndex = 0
      val errorMax = 5
      var successful = false
      while (!successful && errorIndex < errorMax) {
        val response = client.execute {
          logger.info(s"Now writing dataset to ES attempt: ${errorIndex + 1} of ${errorMax}")
          bulk(
            bList.map(business => {
              logger.info(s"Printing business ${business} ")
              indexInto(businessIndex)
                .id(business.businessRecordSource + "_" + business.businessRecordId)
                .doc(business)
            }): _*
          )
        }
        val bulkResponse = response.await
        var hasErrors = false
        bulkResponse.foreach { x =>
          if (x.hasFailures) {
            hasErrors = true
            logger.warn(x.failures.toString())
          }
        }
        if (hasErrors) {
          errorIndex += 1
          Thread.sleep(30 * 1000)
        } else {
          successful = true
        }
      }
      if (errorIndex == errorMax) {
        logger.error(s"Write to ES failed in max:${errorMax} attempts")
        Some(bList)
      }
      else {
        logger.info(s"Write to ES batch succeeded in ${errorIndex + 1} attempt.")
        None
      }
    } catch {
      case e: Exception => {
        logger.error(s"Write to ES failed because of exception : ${e} !!!!!")
        logger.error(s"Write to ES failed because of exception : (${e.getMessage}) : + ${e.printStackTrace()}")
        Some(bList)
      }
    }
}

def generateCompanyDocuments(companyAndPeopleDs: Dataset, esUrl: String, esPort: Int,
                               businessIndex: String, partitionCount: Int, bulkGroupSize: Int
                              )(implicit spark: SparkSession): Dataset = {
    import spark.implicits._
    companyAndPeopleDs.repartition(partitionCount).mapPartitions(part => {
      val client = ESClient.createHttpClient(esUrl, esPort)
      part.grouped(bulkGroupSize).flatMap(grp => writeToES(grp.toList, client, businessIndex))
    }).flatMap(x => x)
}

createCompanyIndex("test_index", esUrl, esPort)
generateCompanyDocuments(companyDF, esUrl, esPort, "test_index", 100, 10)

full stack trace:

	at play.api.libs.json.LowPriorityWrites$$anonfun$traversableWrites$1$$anonfun$apply$2.apply(Writes.scala:334)
	at play.api.libs.json.LowPriorityWrites$$anonfun$traversableWrites$1$$anonfun$apply$2.apply(Writes.scala:333)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at play.api.libs.json.LowPriorityWrites$$anonfun$traversableWrites$1.apply(Writes.scala:333)
	at play.api.libs.json.LowPriorityWrites$$anonfun$traversableWrites$1.apply(Writes.scala:331)
	at play.api.libs.json.Writes$$anon$6.writes(Writes.scala:157)
	at play.api.libs.json.DefaultFormat$$anon$4.writes(Format.scala:80)
	at play.api.libs.json.PathWrites$$anonfun$nullable$2.apply(JsConstraints.scala:178)
	at play.api.libs.json.PathWrites$$anonfun$nullable$2.apply(JsConstraints.scala:176)
	at play.api.libs.json.OWrites$$anon$3.writes(Writes.scala:126)
	at play.api.libs.json.OFormat$$anon$2.writes(Format.scala:49)
	at play.api.libs.json.OFormat$$anon$5$$anonfun$inmap$2.apply(Format.scala:29)
	at play.api.libs.json.OFormat$$anon$5$$anonfun$inmap$2.apply(Format.scala:29)
	at play.api.libs.json.OFormat$$anon$1.writes(Format.scala:42)
	at com.capitalone.apollo.api.marshallers.ObjectMarshaller$$anonfun$64.apply(ObjectMarshaller.scala:26)
	at com.capitalone.apollo.api.marshallers.ObjectMarshaller$$anonfun$64.apply(ObjectMarshaller.scala:26)
	at play.api.libs.json.OFormat$$anon$1.writes(Format.scala:42)
	at play.api.libs.json.OFormat$$anon$1.writes(Format.scala:38)
	at play.api.libs.json.PathWrites$$anonfun$nullable$2.apply(JsConstraints.scala:178)
	at play.api.libs.json.PathWrites$$anonfun$nullable$2.apply(JsConstraints.scala:176)
	at play.api.libs.json.OWrites$$anon$3.writes(Writes.scala:126)
	at play.api.libs.json.OFormat$$anon$2.writes(Format.scala:49)
	at play.api.libs.json.OWrites$MergedOWrites$.mergeIn(Writes.scala:89)
	at play.api.libs.json.OWrites$MergedOWrites$$anon$1.writeFields(Writes.scala:81)
	at play.api.libs.json.OWrites$MergedOWrites$$anon$1.writeFields(Writes.scala:77)
	at play.api.libs.json.OWrites$OWritesFromFields$class.writes(Writes.scala:109)
	at play.api.libs.json.OWrites$MergedOWrites$$anon$1.writes(Writes.scala:77)
	at play.api.libs.json.OFormat$$anon$2.writes(Format.scala:49)
	at play.api.libs.json.OFormat$$anon$5$$anonfun$inmap$2.apply(Format.scala:29)
	at play.api.libs.json.OFormat$$anon$5$$anonfun$inmap$2.apply(Format.scala:29)
	at play.api.libs.json.OFormat$$anon$1.writes(Format.scala:42)
	at com.capitalone.apollo.api.marshallers.ObjectMarshaller$$anonfun$68.apply(ObjectMarshaller.scala:27)
	at com.capitalone.apollo.api.marshallers.ObjectMarshaller$$anonfun$68.apply(ObjectMarshaller.scala:27)
	at play.api.libs.json.OFormat$$anon$1.writes(Format.scala:42)
	at play.api.libs.json.OFormat$$anon$1.writes(Format.scala:38)
	at play.api.libs.json.Json$.toJson(Json.scala:187)
	at com.capitalone.apollo.api.marshallers.ObjectMarshaller$BusinessIndexable$.json(ObjectMarshaller.scala:43)
	at com.capitalone.apollo.api.marshallers.ObjectMarshaller$BusinessIndexable$.json(ObjectMarshaller.scala:42)
	at com.sksamuel.elastic4s.requests.indexes.IndexRequest.source(IndexRequest.scala:31)
	at com.sksamuel.elastic4s.requests.indexes.IndexRequest.doc(IndexRequest.scala:28)
	at com.capitalone.apollo.api.etl.ApolloDocumentGenerator$$anonfun$1.apply(ApolloDocumentGenerator.scala:145)
	at com.capitalone.apollo.api.etl.ApolloDocumentGenerator$$anonfun$1.apply(ApolloDocumentGenerator.scala:141)
	at scala.collection.immutable.List.map(List.scala:284)
	at com.capitalone.apollo.api.etl.ApolloDocumentGenerator$.writeToES(ApolloDocumentGenerator.scala:141)
	at com.capitalone.apollo.api.etl.ApolloDocumentGenerator$$anonfun$generateCompanyDocuments$1$$anonfun$apply$1.apply(ApolloDocumentGenerator.scala:214)
	at com.capitalone.apollo.api.etl.ApolloDocumentGenerator$$anonfun$generateCompanyDocuments$1$$anonfun$apply$1.apply(ApolloDocumentGenerator.scala:214)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:149)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1164)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Currently I am using opensearch 1.0 version

currently The index is getting created fine the load fails at line indexInto(businessIndex) in writeToES function.

code to convert json into dataframe to be passed in generateCompanyDocuments function: val df = spark.read.json(sc.parallelize(Seq(json_doc)))

This is not a discussion forum for OpenSearch :slight_smile:

1 Like

Opensearch is not Elasticsearch and the aws team have made numerous code changes to create this fork. Unfortunately we aren't able to help here.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.