Elasticsearch upgrade from 1.7.1 to 2.3.2 then create index very slow

when i use the elasticsearch 1.7.1 the java job finished ablout one hour;
then i upgreade es to 2.3.2 the java job finished more than three hours.
Anyone can help me, thanks very much.(i used the bulk)

the java code only changed:
// Settings settings = ImmutableSettings.settingsBuilder().put(pro).build();
// Client client = new TransportClient(settings);
// Client client = TransportClient.builder().build()
// .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
// .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));
Settings settings = Settings.settingsBuilder().put(pro).put("indices.store.throttle.type", "none").build();
Client client = TransportClient.builder().settings(settings).build();
for (String ip : pro.getProperty("cluster.nodes").split(",")) {
((TransportClient) client).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip),9300));
}
BulkRequestBuilder bulkRequest = client.prepareBulk();

Can you provide more details about the bulk requests? How many documents are you sending per bulk request? What is the approximate size of each bulk request (in MB)? Are there any other relevant details that you can share?

public Iterable call(Iterator t) throws Exception {
String log = "[" + Math.abs(new Random().nextInt()) + "]";
long count = 0;
String txt = null;
String[] id_js = null;
String index = pro.getProperty("es.index", "dm")+ "_" + date;
String type = pro.getProperty("es.type", "sketch");
int bulkSize = Integer.parseInt(pro.getProperty("es.batch.size", "200"));

// Settings settings = ImmutableSettings.settingsBuilder().put(pro).build();
// Client client = new TransportClient(settings);
// Client client = TransportClient.builder().build()
// .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
// .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));
Settings settings = Settings.settingsBuilder().put(pro).put("indices.store.throttle.type", "none").build();
Client client = TransportClient.builder().settings(settings).build();
for (String ip : pro.getProperty("cluster.nodes").split(",")) {
((TransportClient) client).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip),9300));
}
BulkRequestBuilder bulkRequest = client.prepareBulk();

        long batchSize = 0;
        while (t.hasNext()) {
            txt = t.next();
            batchSize += txt.length();
            id_js = txt.substring(1, txt.length() - 1).split(",", 2);
            bulkRequest.add(client.prepareIndex(index, type, id_js[0]).setSource(id_js[1]));
            if (++count % bulkSize == 0) {
                batch(bulkRequest, log + count + "_" + (int) (batchSize / 1024));
                bulkRequest = client.prepareBulk();
                batchSize = 0;
            }
        }
        if (count % bulkSize != 0) {
            System.out.println(log + " ### last commit");
            batch(bulkRequest, log + count + "_" + (int) (batchSize / 1024));
        }
        client.close();
        return Arrays.asList(count);
    }

    public void batch(BulkRequestBuilder bulkRequest, String log) {
        long failCnt = 0;
        long rejCnt = 0;
        while (true) {
            try {
                if (bulkRequest.execute().actionGet(pro.getProperty(Conf.ES_ACTION_TIMEOUT)).hasFailures()) {
                    // bulk执行中出现错误(rejected)
                    try {
                        System.err.println(log + " *** batch has been rejected " + ++rejCnt);
                        Thread.sleep(1000);
                        continue;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                break;// 上面语句执行成功就退出
            } catch (ElasticsearchException e) {
                try {
                    System.err.println(log + " *** fail to batch " + ++failCnt);
                    e.printStackTrace();
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }
}

the docs: 65543640 size: 140G
i only changed the way to create the client,because the es2.3.2 not have the old api ImmutableSettings method .

Are you allowing the default value to be set to 200, or are you setting this system property to some other value? If so, what is that value?

Do all the documents have the same fields?

the bulksize is to set 200. i try to changed it ,but it does not work ,the job also use about 3 h.
all the documents have the same fields.

Hey @tingking23,

Would you mind if I add to this discussion. I am kind of facing the same issue.

@jasontedor
An indexing job that used to finish in about 4 hours on ES 1.4.5 is taking about 9 hours on ES 2.3.3

Then, I tried profiling each indexing request on ES 1.4.5 and ES 2.3.3
I have 2 EC2 instances with exact same config, one for ES 1.4.5 and one for ES 2.3.3.
I indexed very simple 100 documents using Python code running from each server and hitting the localhost ES server.
With refresh = -1, No. of shards=1, No. of replicas=0, the times for ES 1.4.5 are abt 1ms per indexing request while they are about 3.5ms for ES 2.3.3

Am I doing something wrong, or this is kind of expected ?

Thanks.

Can you share the contents of a single document, made anonymous if necessary (I just want to see the rough shape and content)? Can you share the mapping for sketch? What settings do you have set on the nodes host1 and host2? How much RAM do they have and how much is allocated to the Elasticsearch heap? Finally, what is your storage media, spinning or solid-state (and is it local to the hosts)?

the document i can not see it , i just konw it about custemer name,phone number,adreess and something like that.

here is the elasticsearch config:
index.cache.field.type: "soft"
index.cache.field.max_size: 50000
index.cache.field.expire: 10m
index.number_of_shards: 4
indices.memory.index_buffer_size: 30%
bootstrap.mlockcall: true

here is the java config:
es.batch.size=200
es.action.timeout=50000

the ES:
ES_MAX_MEM=10g
ES_MIN_MEM=10g
ES_HEAP_SIZE=20g

java code:
public static final String ES_INDEX = "es.index";
public static final String ES_TYPE = "es.type";
public static final String ES_HDP_SHARE = "es.cluster.share";
public static final String ES_BATCH_SIZE = "es.batch.size";
public static final String ES_ACTION_TIMEOUT = "es.action.timeout";
public static final String TEMP_FILE_PATH = "temp_file_path";
public static final String TAG_SCHEMA = "tag.schema";
public static final String TAG_TABLES = "tag.tables";

public static final String DEFAULT_ES_INDEX = "dm";
public static final String DEFAULT_ES_TYPE = "sketch";
public static final String DEFAULT_ES_HDP_SHARE = "false";
public static final String DEFAULT_ES_BATCH_SIZE = "8192";
public static final String DEFAULT_ES_ACTION_TIMEOUT = "5000";
public static final String DEFAULT_TEMP_FILE_PATH = "/tmp/sample";

@jasontedor

The document are with this mapping:

 {'test-doc-type' : {
"properties": {
        "id": { "type": "long" },
        "delivery_type":{ "type": "string" },
        "name": { "type": "string" },
        "address": { "type": "string" }
}}}

I generated the documents using a this simple code:

for i in range(1,101):
        body = getDocument(i, 'd'*i, 'name', 'a'*i)
        t_s = datetime.datetime.now()
        http.request('PUT', 'localhost:9200/'+index_name+'/'+doc_type+'/'+str(i), body=json.dumps(body))
        t_e = datetime.datetime.now()
        logfile.write('indexed document id : %d in %s\n'% (i, str(t_e - t_s)))

#--
def getDocument(id, delivery_type, name, address):
return {
"id":id,
"delivery_type":delivery_type,
"name": name,
"address":address
}

For both ES installations I used the default configs. Only changes made were related to No.of shards and No.of replicas.

Both machines are EC2 m4.4xlarge machines, with 64GB RAM, EBS backed.

Since the data is so small, and there are no refreshes, I am thinking the storage media should not matter much. I any case, the media is exactly same for both instances.

Thanks.

One of the main difference I can think about is the fact we are now fsynch'ing every request.

Might worth increase the bulk size

I do not see fync happening after every request. I indexed 100 docs, with refresh=-1, and after indexing I did not make any refresh using code.
At this moment the translog had entries, but no .cfs or .cfe files were created.

I then manually hit the Refresh API and could see .cfs file being created but the segment had attributes as search=true and commit=false.

Then I did a Flush and could see the segment attribute changed to commit=true.

AFAIK, this is the expected behavior and I could see the same happening in ES 1.4.5 and ES 2.3.3 So I doubt that an fsync is being done at each indexing request.

If I am missing something here, plz let me know.

Thanks.

That's exactly why I was asking all the questions I was asking. To clarify though, we fsync the translog on every request.

I think that you're misunderstanding exactly what is fsync'ed with each request, it is just the translog.

And you mean to say that the translog was not getting fsynced per request in ES 1.4.5 but is being done in ES 2.3.3. And because of this there is the penalty per indexing request. Makes sense..

But if the translog is not being fsynced per request in ES 1.4.5, how do we ensure that nothing is lost (and is recoverable) if the server crashes ?

Came across index.gateway.local.sync setting for ES 1.4.5 and index.translog.durability setting for ES 2.3.3. They kind of explain what you mentioned.
Thanks @jasontedor.

Correct.

You can't, this is why per-request fsyncs were added in the 2.x series.

These are the correct settings.

I tried updating index.translog.durability to async. The GET /_settings API shows the updated settings but still I am incurring 100 translog.operations (as shown by Index Status API) for 100 indexing requests.

Am I missing something ?

Thanks.

It does exactly because, as we have been discussing, the translog is fsync'ed per request in the 2.x series but not the 1.x series.

Note also that "there are no refreshes" is slightly incorrect. In the 2.x series, we do a refresh when the indexing buffer fills up.

Yes, what you're missing is that we still write to the translog, we just async fsync it.

I actually wanted to mention that my per index times are not coming down much ( comparable to those of ES 1.4.5) but I guess that is the most I can get.

Right, so the reason that I was asking so many questions earlier is because my first thought on the initial post was that this was due to the fsync change but I wasn't convinced that it accounts for a change of this magnitude (one hour to three hours, or 3x).

Can you grab hot threads on the nodes while you're doing the bulk ingestion?