Data Loss in elasticsearch in high load

Hi All,

We are facing a strange and critical issue while testing elastic-search in high speed transaction monitoring.

We are using Elastic-search for dumping high speed data coming from JMX MXbean. Our JMX listeners are getting objects from MXbean and using Elastic search High Level Java REST API, we are sending these objects (as JSON format) for indexing. Note : We are using Bulk insert with 500 document at once.

With single JMX object, its working good, But when we start load tests, first few iterations for 2 min (with about 150K documents) goes fine, and after sometime we will see Listeners are receiving more data but Document inserted in elastic-search are quite less. For example, I sent 150K Documents, but at the end of Load test, the document count in elastic search shows only 148K. There is no error i see in logs.

Any one faced this issue? How can we resolve this?

I saw, good design will be to dump into DB first and then from there using Log-stash we can get or index those documents. Is this only option left with me?

Please suggest.

Thanks,
Param

Are you checking the bulk request response for errors and retrying documents that may have failed? How large is your Elasticsearch cluster?

Yes, I am checking the bulk response status. There is no error. I do not have any cluster. Its one node, Development machine.

Thanks,
Param

Are you checking for documents which have failed getting indexed, because of maybe mapping issues, parsing exceptions, etc. Check logs for the same.

Is it possible that some of the documents you're sending have the same id? For instance, if you use the current timestamp in millis and are generating more than one document in the same milisecond, that might happen.

Thanks all for response.

I got little deep into bulk response and seeing this error.

Failure message : [service_transaction_1/0bFoO7ZIQGuFCpdGT_KvZA][[service_transaction_1][4]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[service_transaction_type_1][df5ea2ea-3278-4724-bce2-89e3dbfdf1aa]: version conflict, document already exists (current version [1])]]
Is failed : true

....

I know now, that this error i am getting, as elasticsearch is trying to create a document of same version which already exists [1].

Question is, why elastic search creating same document, as i am passing ID explicitly, in above example df5ea2ea-3278-4724-bce2-89e3dbfdf1aa.

But when i search this document in elastic, it says not found.

GET service_transaction_1/service_transaction_type_1/df5ea2ea-3278-4724-bce2-89e3dbfdf1aa

{
"_index": "service_transaction_1",
"_type": "service_transaction_type_1",
"_id": "df5ea2ea-3278-4724-bce2-89e3dbfdf1aa",
"found": false
}

I am inserting document As below by passing explicit ID.

IndexRequest indexRequest = new IndexRequest("service_transaction_1", "service_transaction_type_1", UUID.randomUUID().toString())
.source(putJsonDocumentTrans(obj.combinedBpelObj, obj.transobj));

indexRequest.opType(DocWriteRequest.OpType.CREATE);

requestBulk.add(indexRequest);

Is there any way to disable versioning completely for my index ?

Thanks,
Param

Why do you use an opType? If you're generating new IDs for each document, you don't really need that. Version conflicts come from there. Just remove that line. What happens?

Might be the reason, Now i changed the API call by passing my ID explicitly. As mentioned in my today's post. Could you please check, Do you see any issues in my way of inserting doc?

Basically i wants only create operation for each of my insert call. But i am getting version issue which i am not getting why!!. As if i am creating a unique id of document from my call, why this version issue should come :frowning:

See my previous comment

If i will remove this line, I will see Data loss, As original issue. I added this today just to make sure elasticsearch will do only create not update.

Param

Ok, can you try to remove your UUID generation and let ES generate its own IDs. What happens?

This is what the original design i had. ES was generating its own ID. In this case if i am sending 150K document insert request, i see only 148 / 149K document inserted in elasticsearch.

POST service_transaction_1/_count

Before every load test, i delete the index.

Param

When getting the document count at the end of the benchmark, do you wait for a final refresh to occur?

2 Likes

Yes, I tried that also.

Before my load test executes, i execute

PUT bpel_mon_event/_settings
{
"index" : {
"refresh_interval" : "-1"
}
}

and once the load test is finish i set it back to 1s.

Still count is not same as i am sending.

Param

The index is called service_transaction_1 but you're modifying the refresh_interval of bpel_mon_event? or is that an alias?

Sorry, service_transaction_1.

We have two indexes where data is flowing. I pasted query for one of them

Big problem is : The behaviour is not consistent. Sometime i do not get any error if i am sending id from my API. Sometime, i gets version issue.

If i remove OptType 'CREATE' , I see documents are missing.

Should i use LogStash in this case? What will you suggest. But if i use LogStash, it will not be real time data, As i will configure logstash script to run after some interval.

Param

Unless you need to update documents based on an external ID, I would recommend letting Elasticsearch create the document IDs as this gives better indexing throughput. Once you set the refresh_rate back to 1, you need to wait a while to ensure that a refresh takes place before retrieving the document count. You may also be able to force a refresh at the end of the benchmark using the refresh API before you get the document count.

I tried this also. But No Luck :frowning: ..

I am sharing my Program, If anyone can review this and share thoughts, what wrong i am doing.

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
indent preformatted text by 4 spaces
/**

  • @author Param
    */
    public class SampleLoadProgram {

    private static Logger LOGGER = Logger.getLogger("SampleLoadProgram");
    private static final String ELASTIC_HOST = "127.0.0.1:9200:http";

    public static void main(String a) throws IOException {
    BulkRequest requestBulk = new BulkRequest();
    RestHighLevelClient client = null;
    Set instanceid = new HashSet<>();
    HttpHost hosts = null;
    String hostArr = ELASTIC_HOST.split(",");
    hosts = new HttpHost[hostArr.length];
    int index = 0;
    //Praparing HTTPHost array
    for (String hostString : hostArr) {
    String hostSArr = hostString.split("\:");
    Integer port = Integer.parseInt(hostSArr[1]);
    HttpHost host = new HttpHost(hostSArr[0], port, hostSArr[2]);
    hosts[index++] = host;
    }
    //Initializing client
    client = new RestHighLevelClient(RestClient.builder(hosts));
    // change max length to increasae load
    for (int i = 0; i < 10000; i++) {
    String iidstr = UUID.randomUUID().toString();
    instanceid.add(iidstr); // Maintain unique instance id
    for (int j = 0; j < 5; j++) {
    Map<String, Object> data = new HashMap<>();
    data.put("trans_id", iidstr);
    data.put("j_id", UUID.randomUUID().toString());
    IndexRequest indexRequest = new IndexRequest("service_transaction", "service_transaction_type")
    .source(data);
    requestBulk.add(indexRequest);
    }
    if (i % 10000 == 0) { //Push the batch and create new
    client.bulk(requestBulk);
    requestBulk = new BulkRequest();
    }
    }
    client.bulk(requestBulk);
    client.close();
    LOGGER.log(Level.INFO, "Total expected unique instance count : {0}", instanceid.size());
    }
    }

TO FETCH UNIQUE trans_id, I am using following GET query (May be something is wrong here)

GET service_transaction/service_transaction_type/_search
{
"size":"0",
"aggs" : {
"total_trans_id" : {
"cardinality" : { "field" : "trans_id.keyword"}
}
}
}

Please help.

Param

With the cardinality aggregation, beware that the count is approximate

1 Like

OHH..

Thanks.

What is the correct way to fetch unique ids in my case? Could you please guide?

It seems the document count is coming correct after explicit refresh POST

POST /service_transaction/_refresh

But My GET query is not showing correct count, As you said it is approximate. But for some reason we need exact count of unique transactions.

This is response now.

{
"took": 0,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 50000,
"max_score": 0,
"hits": []
},
"aggregations": {
"total_trans_id": {
"value": 9962
}
}
}

Hits > Total is document count : Perfact count.

But total trans_id should be 10000, but it is 9962

Param