Elastic search: Slow performance on large data set

I have 7 node Elastic search (v2.3.3) cluster with 2 indices and both have nested object mapping. I am getting significant delay in insert to Index2 (through spark streaming v1.6). I am using Bulk insert which takes ~8-12s per batch (~100k record).

Node Configuration:
RAM: 64 GB
Core: 48
HDD : 1 TB
JVM allocated Memory: 32 GB

Marvel Node Status:
CPU Usages: ~10-20%
JVM Memory: ~60-75%
Load Average    : ~3-35
Indexing Rate: ~10k/s
Search Rate: ~2k/s

Index1 (Replication 1): 
Status: green
Documents: 84.4b
Data: 9.3TB
Total Shards: 400 (Could it be the reason of low performance)

Index2 (Replication 1): 
Status: green
Documents: 1.4b
Data: 35.8GB
Total Shards: 10
Unassigned Shards: 0

Spark streaming configuration:
executors: 2
Executor core per executor: 8
Number of partition: 16
batch size: 10s
Event per batch: ~1k-200k
Elastic Bulk insert count: 100k

Index2 mapping:

{
  "settings": {
    "index": {
      "number_of_shards": 5,
      "number_of_replicas": 1
    }
  },
  "mappings": {
    "parent_list": {
      "_all": {
        "enabled": false
      },
      "properties": {
        "parents": {
          "type": "nested",
          "properties": {
            "parent_id": {
              "type": "integer",
              "doc_values": false
            },
            "childs": {
              "type": "nested",
              "properties": {
                "child_id": {
                  "type": "integer",
                  "doc_values": false
                },
                "timestamp": {
                  "type": "long",
                  "doc_values": false
                },
                "is_deleted": {
                  "type": "boolean",
                  "doc_values": false
                }
              }
            }
          }
        },
        "other_ID": {
          "type": "string",
          "index": "not_analyzed",
          "doc_values": false
        }
      }
    }
  }
}

My query:

  1. Get count by parent ID with at least one child with is_deleted false.
    
  2. Get count by child ID with is_deleted false.
    
  3. Get documents by _id
    

I was expecting more performance from Elastic but it become bottleneck of my system. Can someone suggest performance tuning? Can we achieve more insert rate from Elastic with this cluster configuration?

Link to stack-overflow : stackoverflow

How many parent documents do you have per index? How many nested child documents are there on average per parent? How large portion of bulk inserts are updates? What does disk IO and iowait look like on the nodes during indexing?

Is there any evidence of long GC or merge throttling in the Elasticsearch logs?

Index1 has 2B (Count on Sense) parent and 40B (Document Count on Marvel) total document count.
Index2 has 110M Parent and 700M total document.
I tried 10k and 100k batch insert (most of them are update ~90-95%) in Index2.

I have added Marvel Node Status figures in Question.

I haven't checked GC or merge throttling in the Elasticsearch logs yet.

Each update request will result in a search as well as a number of writes, as each nested document internally will be represented by a separate document. All these updates will result in a lot of segment merging needing to be performed, which results in a lot of disk IO. Combine this with a reasonably high query rate and I would be surprised if disk IO is not what is limiting performance, especially since you seem to be using spinning disks.

Look for signs of merge throttling in the logs but also look at IO performance e.g. using iostat. For this type of use case you would most likely be better off using SSDs.

I will check IO stats. Meanwhile can you tell me with this configuration and schema what should be expected throughput? and what if i will use SSD? Should i use parent-child relation instead of nested?

There are lot of merge throttle message in logs:

[2016-12-22 10:34:21,091][INFO ][index.engine ] [host_name] [Index2][4] now throttling indexing: numMergesInFlight=10, maxNumMerges=9

There are some error and exceptions as well.

[Index1][[Index1][0]] FlushFailedEngineException[Flush failed]; nested: IllegalStateException[this writer hit an unrecoverable error; cannot commit]; nested: OutOfMemoryError[Java heap space];
Caused by: java.lang.IllegalStateException: this writer hit an unrecoverable error; cannot commit
Caused by: java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space

[2016-09-22 00:40:27,382][WARN ][cluster.action.shard ] [host_name] [Index1][0] received shard failed for target shard [[Index1][0], node[KCti7QjiT9yeqbNu9mBe-w], [P], v[82], s[STARTED], a[id=yGqpeHVlQFCw-hRQ5t31PA]], indexUUID [T4vVith5TnuRSnOsiiowjA], message [engine failure, reason [already closed by tragic event on the index writer]], failure [OutOfMemoryError[Java heap space]]
java.lang.OutOfMemoryError: Java heap space

[Index1][[Index1][0]] IndexFailedEngineException[Index failed for [Index1#]]; nested: OutOfMemoryError[Java heap space];
Caused by: java.lang.OutOfMemoryError: Java heap space
[2016-09-22 01:00:00,372][ERROR][marvel.agent.exporter.local] local exporter [default_local] - failed to delete indices
[.marvel-es-1-2016.09.15] IndexNotFoundException[no such index]

[2016-09-22 10:55:25,381][WARN ][cluster.action.shard ] [host_name] [Index1][0] received shard failed for target shard [[Index1][0], node[Uk8GIUInTB2C6nemqpkYXg], [P], v[86], s[STARTED], a[id=Xt6NfifcQu2zcedOQauq8g]], indexUUID [T4vVith5TnuRSnOsiiowjA], message [engine failure, reason [out of memory (source: [index])]], failure [OutOfMemoryError[Java heap space]]
java.lang.OutOfMemoryError: Java heap space

That is a clear indication that disk IO is the bottleneck.

Before discussing modelling options I would like to know a bit more about your use case and query and access patterns.

  1. When a document is updated through a bulk request, how many child documents are typically added/deleted/updated?
  2. Are the 3 types of queries you listed equally common? If not, what is the distribution between them? Does this vary depending on which index they query?
  3. How many results are typically counted/returned for queries of type 1 and 2?
  4. How many different child IDs do you have?
  5. What does this data represent?

Few corrections:

  1. The last JVM out memory log is very old (During initial setup 3 months old). I haven't got any out of memory error recently.
  2. I am using SSD

elasticsearch.yml

path.data: /drive1,/drive2,/drive3

iostat output on heavy load (total 100k record per spark streaming batch of 10s, 10k bulk record count (read and then update)) :

[root@host_name]# df -kh
Filesystem Size Used Avail Use% Mounted on
/dev/sda3 916G 1.9G 867G 1% /
tmpfs 32G 0 32G 0% /dev/shm
/dev/sda1 248M 66M 171M 28% /boot
/dev/sdb1 880G 447G 389G 54% /drive1
/dev/sdc1 880G 446G 390G 54% /drive2
/dev/sdd1 880G 438G 398G 53% /drive3

[root@host_name~]# iostat
Linux 2.6.32-642.3.1.el6.x86_64 (host_name) 01/03/2017 x86_64 (48 CPU)

avg-cpu: %user %nice %system %iowait %steal %idle
11.53 0.00 1.50 2.06 0.00 84.91

Device: tps Blk_read/s Blk_wrtn/s Blk_read Blk_wrtn
sda 2.82 111.01 8.93 1601686496 128792394
sdb 1333.28 53400.79 29806.18 770507830308 430066522656
sdd 1299.42 51785.25 28747.67 747197513340 414793555416
sdc 1332.80 53452.75 29554.23 771257533804 426431221632

Please find inline answer:

  1. When a document is updated through a bulk request, how many child documents are typically added/deleted/updated?

For each spark batch, i get list of _ID and then multiget all of them from Elastic and then modify all of them (usually 2-3 child per _ID get added/updated. no delete) and then Bulk Upsert.

  1. Are the 3 types of queries you listed equally common? If not, what is the distribution between them? Does this vary depending on which index they query?

As you can get from 1st answer, spark use 3rd query every 10s and there is batch job (cron) that runs every 30 min (because first 2 query takes 17 min now for all parent and child. python script. Need to improve this as well) but need to run it every 5-10 s.

  1. How many results are typically counted/returned for queries of type 1 and 2?

In general for both type 20-50K but close to 10% contains 10-100M

  1. How many different child IDs do you have?

On a average there will be 1-2k parents and each will have ~3-5 child. each parent and child ID is unique.

  1. What does this data represent?

you can consider each child as file ID and _ID is entry in file and parent is folder. (this analogy is close to actual use case)

What does iostat -x show during heavy indexing?

[root@host_name 0]# iostat -x
Linux 2.6.32-642.3.1.el6.x86_64 (host_name)  01/03/2017  _x86_64_  (48 CPU)

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          11.53    0.00    1.50    2.06    0.00   84.91

Device:         rrqm/s   wrqm/s     r/s     w/s   rsec/s   wsec/s avgrq-sz avgqu-sz   await r_await w_await  svctm  %util
sda               0.88     0.52    2.23    0.60   110.99     8.93    42.46     0.01    1.84    2.18    0.60   1.13   0.32
sdb               0.14  3069.91  677.83  655.21 53391.54 29800.99    62.41     0.13    0.10    0.23    0.41   0.15  19.68
sdd               0.20  2960.10  666.37  632.83 51776.28 28743.42    61.98     0.25    0.19    0.05    0.33   0.15  19.10
sdc               0.13  3039.15  678.06  654.51 53443.58 29549.28    62.28     0.05    0.04    0.16    0.36   0.15  19.49

I got merge throttle error again:

[2017-01-03 12:12:45,741][INFO ][index.engine ] [host_name] [Index2][1] now throttling indexing: numMergesInFlight=10, maxNumMerges=9
[2017-01-03 12:12:45,750][INFO ][index.engine ] [host_name] [Index2][1] stop throttling indexing: numMergesInFlight=8, maxNumMerges=9

One way to reduce merging and IO could be to increase the refresh interval of the indices if you have not already done so. As your workload is batch-oriented, you may even disable it and issue a separate refresh request once a batch has completed. This will however make updates take longer to show up in results when running count queries.

As this is not a typical use case I come across often, it is hard to tell what kind of performance is expected, or even which data model that is best for the job.

Assuming that queries of type 3 are more numerous that queries of type 1 and 2, switching to parent-child may not improve performance as more separate documents will need to be retrieved.

As far as I can see query types 1 and 3 should be possible to get answered based on the current data structure even without nested mapping, as flattening the document would still give the correct result. One option could therefore be to index the parent documents with the child documents embedded (parent ID as key), but not mapped as nested documents in one index and store each child document (combination of parent ID and child ID as key), with parent ID denormalised, in a separate index.

The first index would be used to serve queries of type 1 and 3 while the second index with the child documents would be used to serve queries of type 2. Depending on how queries are structured, it may be possible to further make this more efficient through the use of custom routing. This requires a larger number of simple documents rather than a smaller number of nested and therefore more complex documents to be updated, and may or may not be more efficient.

The only way to determine which model is most efficient for your use case will however be to benchmark them and compare.

I will try to explain my use case.

In above schema parent is user, child is file (that contains say only deviceID) and _ID is deviceID.

each user (~1-5k) can upload and delete their files (upload/delete through web portal, we maintain those entries in mysql like user_id, file_id, file url etc and stream file entries to spark). Normally file contains 10-50K mobile entries but upper limit is 100M per user and sometime we do get such file (say ~5-10%). each user normally upload/delete 2-5 files per day. There can be overlap in deviceID between 2 user or even 2 files.

User can also directly stream deviceIDs to spark with their user_id and default file_id and action saying remove or add. data volume through direct streaming is ~1-5k/s

Now our requirements are -

  1. show total count of active (added and not removed) deviceID for user and restrict once upper limit reaches per user. this may be delayed by 5-10s
  2. show total active deviceID for user (as downloadable file). This may be delayed 5-30s
  3. maintains deviceID (active) -> List(user_ids) map in aerospike (spark writes it). This should be real time or as close as possible.

Currently 1 and 2 is cron job and 3 is through spark streaming.

Is Elastic search correct for this use case?

I suspect this can be modelled in a number of different ways and implemented using a variety of data stores, of which Elasticsearch certainly is one. I do however not know how the performance or efficiency of a solution based on Elasticsearch would compare to other, not yet defined, options.

I have already invested significant resource in Elastic so i want to try Elastic first before moving to next solution. I have already mentioned you my use case and type of query and hardware specification. Can you guide me somewhere based on that? may be right mapping ?

It is hard to analytically try to determine whether your current model will or will not perform better than e.g. the possible alternative I described earlier. If you simplify and make indexing more performant you may at the same time make querying more expensive and vice versa. How the balance swings may depend on the data model, cluster configuration, hardware limitations as well as distribution in time of indexing and query load. Running realistic benchmarks is therefore often the only way to determine this.

It may be that someone else here in the forum has implemented something similar to what you are looking to do and can provide better guidance based on practical experience, but I would need to resort to benchmarking to be sure.

In my application, spark code merges existing data in elastic if any and new incoming data per deviceID. currently i use upsert. since i am already taking care of merge in my code , can i use indexRequest instead of Upsert? will it improve performance ?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.