How to append to a existing list using Java for a given ID for bulk inserts/update

Hello Team,

I am using this piece of code to update to the elasticsearch index, everything is working fine, but the document is having a list which is getting overridden, and not appended. I am using a bulkprocessor to insert all the data to the index. I am updating the content based on the sessionId.

			IndexRequest request = new IndexRequest(elasticSummaryIndex, "kafka");
			request.id((String) communicationSessionsMap.get("communicationSessionID"));
			request.source(communicationSessionsMap, XContentType.JSON);
			bulkProcessor.add(request);

I am using a bulkprocessor which is pushing data for every 10000 requests and when the max size reaches 5mb.

Can you please help me out as I need to append new data to the list and not update the content as a whole.

The actual output should be like this --

{
  "_index": "kafka_gw_visibility_summary",
  "_type": "kafka",
  "_id": "HttpServerAdapter_15633763427671124:-999",
  "_version": 7,
  "_score": 0,
  "_source": {
    "protocol": "HTTP",
    "communicationTrackingEvents": [
      {
        "eventName": "CommConnect",
        "startTime": "2019-07-17 15:12:22.767",
        "endTime": "2019-07-17 15:12:22.767"
      },
      {
        "fileSize": "1001",
        "eventName": "CommFileXferBegin",
        "startTime": "2019-07-17 15:12:22.814",
        "transferId": "HttpServerAdapter_15633763427671124:-999-1"
      },
      {
        "eventName": "CommFileXferComplete",
        "endTime": "2019-07-17 15:12:22.814",
        "transferId": "HttpServerAdapter_15633763427671124:-999-1"
      },
      {
        "eventName": "CommAuthorization"
      },
      {
        "fileSize": "41",
        "eventName": "CommFileXferBegin",
        "startTime": "2019-07-17 15:12:23.164",
        "transferId": "HttpServerAdapter_15633763427671124:-999-3"
      },
      {
        "eventName": "CommFileXferComplete",
        "endTime": "2019-07-17 15:12:23.164",
        "transferId": "HttpServerAdapter_15633763427671124:-999-3"
      },
      {
        "eventName": "CommDisconnect",
        "startTime": "2019-07-17 15:12:23.164",
        "endTime": "2019-07-17 15:12:23.164"
      }
    ],
    "communicationSessionID": "HttpServerAdapter_15633763427671124:-999",
    "locallyInitialized": "false",
    "startTime": "2019-07-17 15:12:23.164",
    "endTime": "2019-07-17 15:12:23.164",
    "state": "Initialized",
    "status": "true"
  }
} 

but I am actually getting ,

{
  "_index": "kafka_gw_visibility_summary_new",
  "_type": "kafka",
  "_id": "HttpServerAdapter_156337651275912491:-999",
  "_version": 3,
  "_score": 1,
  "_source": {
    "protocol": "HTTP",
    "communicationTrackingEvents": [
          {
            "eventName": "CommDisconnect",
            "startTime": "2019-07-17 15:12:23.164",
            "endTime": "2019-07-17 15:12:23.164"
          }
    ],
    "communicationSessionID": "HttpServerAdapter_156337651275912491:-999",
    "locallyInitialized": "false",
    "startTime": "2019-07-17 15:15:14.457",
    "endTime": "2019-07-17 15:15:14.457",
    "state": "Initialized",
    "status": "true"
  }
}

Any suggestion will be highly appreciated

No need to ping who are not part of the discussion yet. Thanks.

Can you please help me out ?

Read this and specifically the "Also be patient" part.

It's fine to answer on your own thread after 2 or 3 days (not including weekends) if you don't have an answer.

Yeah @dadoonet I know really good things take a lot of time :slight_smile:

When you want to call the Update API, you need to use an UpdateRequest and not an IndexRequest.

Have a look at:

Thank you @dadoonet for your valuable suggestion. Still there is a problem

bulkProcessor.add(new UpdateRequest(elasticSummaryIndex, "kafka", (String) communicationSessionsMap.get("communicationSessionID")).doc(communicationSessionsMap, XContentType.JSON));

whenever I am using UpdateRequest, no data gets inserted, but whenever I am doing a IndexRequest data is inserted succesfully, so is this means UpdateRequests will not work as upsert, if data is not present for the first time. If not is there any way to enable upsert from java api. Will be awaiting for your reply

@dadoonet Just a update, the above issue is resolved:

bulkProcessor.add(new UpdateRequest(elasticSummaryIndex, "kafka", 
					(String) communicationSessionsMap.get("communicationSessionID"))
					.doc(communicationSessionsMap, XContentType.JSON).docAsUpsert(true)); 

but still now the list is getting overridden and not getting appended to the list, the previous datas are lost, and only the last value resides :frowning:

"communicationTrackingEvents": [
      {
        "eventName": "CommDisconnect",
        "startTime": "2019-07-17 15:26:04.23",
        "endTime": "2019-07-17 15:26:04.23"
      }
    ]  

It should have been

"communicationTrackingEvents": [
      {
        "eventName": "CommConnect",
        "startTime": "2019-07-17 15:12:22.767",
        "endTime": "2019-07-17 15:12:22.767"
      },
      {
        "fileSize": "1001",
        "eventName": "CommFileXferBegin",
        "startTime": "2019-07-17 15:12:22.814",
        "transferId": "HttpServerAdapter_15633763427671124:-999-1"
      },
      {
        "eventName": "CommFileXferComplete",
        "endTime": "2019-07-17 15:12:22.814",
        "transferId": "HttpServerAdapter_15633763427671124:-999-1"
      },
      {
        "eventName": "CommAuthorization"
      },
      {
        "fileSize": "41",
        "eventName": "CommFileXferBegin",
        "startTime": "2019-07-17 15:12:23.164",
        "transferId": "HttpServerAdapter_15633763427671124:-999-3"
      },
      {
        "eventName": "CommFileXferComplete",
        "endTime": "2019-07-17 15:12:23.164",
        "transferId": "HttpServerAdapter_15633763427671124:-999-3"
      },
      {
        "eventName": "CommDisconnect",
        "startTime": "2019-07-17 15:12:23.164",
        "endTime": "2019-07-17 15:12:23.164"
      }
    ]

Thank you

I believe that the only way is by using a script.

Can you help me out on the script, I am getting a null pointer exception, as the updateRequest is always coming null ( although the value is inserted in the index)

updateRequest.script(new Script(ScriptType.INLINE, "ctx._source.communicationTrackingEvents += communicationTrackingEvents", 
						null, communicationSessionsMap));

Thanks again

No sorry. I never use scripts. I prefer to control everything in my application, like read the existing document with a GET update in my app and send a full document back.

If you need help, provide a full Kibana dev console recreation script as described in About the Elasticsearch category. Please, try to keep the example as simple as possible.

1 Like

How come you are storing this data in an array rather than in multiple separate documents? How large can these arrays be? How are you querying this data? How frequently are documents updated?

Hi Christian actually there are seperate 9-10 documents which are having a field (sessionId in my case) which is equal across all the documents. I am actually putting some aggregation logic and grouping them to a single document with a field having lists of all the documents under the same sessionId.

My problem here is I am not able to append to the list everytime new data comes in, rather it's get updated, and I am doing a bulk update.

Thank you

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.