Bulk is too slow

Hi, all:
I wanna use ElasticSearch to store and search web logs in realtime, and I use Python API to bulk insert into ElasticSearch.
I'm trying to bulk insert batches of 1000 documents into elastic search using a predefined Mapping . Yet each bulk insert takes roughly 1 seconds any idea to improve bulk performance?
ElasticSearch Configuraiton
network.host: 0.0.0.0
indices.memory.index_buffer_size: "1g"
bootstrap.memory_lock: true

JVM
-Xms8g
-Xmx8g

Index mapping
PUT protocol
{
"settings": {
"index": {
"number_of_shards": 5,
"number_of_replicas": 0,
"translog.flush_threshold_size": "1gb",
"translog.durability": "async"
}
},
"mappings": {
"http": {
"properties": {
"timestamp": {
"type": "date",
"index": true
},
"asset_keys": {
"type": "keyword",
"index": true
},
"skey": {
"type": "keyword",
"index": true
},
"dkey": {
"type": "keyword",
"index": true
},
"uid": {
"type": "keyword",
"index": true
},
"src_ip": {
"type": "keyword",
"index": true,
"copy_to": "protocol_all"
},
"src_port": {
"type": "keyword",
"index": true,
"copy_to": "protocol_all"
},
"dst_ip": {
"type": "keyword",
"index": true,
"copy_to": "protocol_all"
},
"dst_port": {
"type": "keyword",
"index": true,
"copy_to": "protocol_all"
},
"method": {
"type": "keyword",
"index": true,
"copy_to": "protocol_all"
},
"URI": {
"type": "keyword",
"index": false,
"copy_to": "protocol_all"
},
"url": {
"type": "keyword",
"index": false,
"copy_to": "protocol_all"
},
"protocol": {
"type": "keyword",
"index": false,
"copy_to": "protocol_all"
},
"host": {
"type": "keyword",
"index": false,
"copy_to": "protocol_all"
},
"User-Agent": {
"type": "keyword",
"index": true,
"copy_to": "protocol_all"
},
"Cookie": {
"type": "keyword",
"index": false,
"copy_to": "protocol_all"
},
"username": {
"type": "keyword",
"index": false,
"copy_to": "protocol_all"
},
"password": {
"type": "keyword",
"index": false
},
"status_code": {
"type": "integer",
"index": false
},
"request": {
"type": "object",
"enabled": false
},
"response": {
"type": "object",
"enabled": false
},
"sensor_ip": {
"type": "keyword",
"index": true,
"copy_to": "protocol_all"
},
"location": {
"type": "geo_point",
"index": false
},
"city": {
"type": "keyword",
"index": false
},
"country": {
"type": "keyword",
"index": false
},
"raw_data": {
"type": "object",
"enabled": false
},
"protocol_all": {
"type": "keyword",
"index": true
},
"es_type": {
"type": "keyword",
"index": false
}
}
}
}
}

And the insert data almost 1000 * 10k per bulk request

Which version are you using? What is the specification of your Elasticsearch cluster (number of data nodes, CPU, RAM, heap size and type of storage)? How large are your events? How many parallel indexing threads do you have running? What indexing throughput are you seeing?

My Elasticsearch version is 5.6.2, and in my cluster has just one node.
CPU: Intel(R) Xeon(R) CPU E5-2630 v4 @ 2.20GHz * 2
RAM: 64G
JVM: -Xms8g -Xmx8g -Xss1m

event almost 8-15k

I use 20 thread to insert, and I see protocol index rate is 1000 documents/s from Kibana

So your events are between 8kB and 15kB in size and you send bulk requests of 1000 at a time?

How many shards are you actively indexing into? What type of storage do you have? Do you see any issues in the Elasticsearch logs, e.g. long GC? What does CPU usage and disk I/O and iowait look like on the node?

Yes, 8k-15k * 1000 per bulk request, and I connect to ElasticSearch use 127.0.0.1:9200

"number_of_shards": 5,
"number_of_replicas": 0,

My storage is 1T SATA disk, and none is output in Elasticsearch logs

iostat
Linux 3.10.0-693.2.2.el7.x86_64 (master.prs) 11/13/2017 x86_64 (20 CPU)

avg-cpu: %user %nice %system %iowait %steal %idle
1.11 0.00 0.10 0.23 0.00 98.57

Device: tps kB_read/s kB_wrtn/s kB_read kB_wrtn
sda 15.05 24.43 799.68 6416090 210045971
dm-0 13.86 24.35 799.61 6395555 210026177
dm-1 0.00 0.02 0.00 5340 0

avg-cpu: %user %nice %system %iowait %steal %idle
10.05 0.00 0.60 2.80 0.00 86.55

Device: tps kB_read/s kB_wrtn/s kB_read kB_wrtn
sda 51.00 0.00 13329.00 0 13329
dm-0 46.00 0.00 13329.00 0 13329
dm-1 0.00 0.00 0.00 0 0

avg-cpu: %user %nice %system %iowait %steal %idle
10.65 0.00 0.50 2.05 0.00 86.80

Device: tps kB_read/s kB_wrtn/s kB_read kB_wrtn
sda 44.00 0.00 13372.50 0 13372
dm-0 42.00 0.00 13372.50 0 13372
dm-1 0.00 0.00 0.00 0 0

avg-cpu: %user %nice %system %iowait %steal %idle
8.16 0.00 0.95 1.80 0.00 89.09

Device: tps kB_read/s kB_wrtn/s kB_read kB_wrtn
sda 38.00 8.00 220.50 8 220
dm-0 29.00 8.00 220.50 8 220
dm-1 0.00 0.00 0.00 0 0

avg-cpu: %user %nice %system %iowait %steal %idle
17.91 0.00 0.75 2.95 0.00 78.39

Device: tps kB_read/s kB_wrtn/s kB_read kB_wrtn
sda 53.00 0.00 13397.50 0 13397
dm-0 47.00 0.00 13397.50 0 13397
dm-1 0.00 0.00 0.00 0 0

mpstat
Linux 3.10.0-693.2.2.el7.x86_64 (master.prs) 11/13/2017 x86_64 (20 CPU)

05:01:57 PM CPU %usr %nice %sys %iowait %irq %soft %steal %guest %gnice %idle
05:01:58 PM all 9.30 0.00 0.55 2.85 0.00 0.05 0.00 0.00 0.00 87.25
05:01:59 PM all 14.30 0.00 0.45 1.40 0.00 0.00 0.00 0.00 0.00 83.85
05:02:00 PM all 5.80 0.00 0.70 1.10 0.00 0.00 0.00 0.00 0.00 92.40
05:02:01 PM all 12.07 0.00 0.50 2.40 0.00 0.00 0.00 0.00 0.00 85.03
05:02:02 PM all 11.56 0.00 0.70 2.15 0.00 0.00 0.00 0.00 0.00 85.59
05:02:03 PM all 9.10 0.00 0.50 1.95 0.00 0.00 0.00 0.00 0.00 88.44
05:02:04 PM all 10.56 0.00 0.65 3.10 0.00 0.00 0.00 0.00 0.00 85.69
05:02:05 PM all 11.26 0.00 0.60 0.00 0.00 0.00 0.00 0.00 0.00 88.14
05:02:06 PM all 11.31 0.00 0.60 2.70 0.00 0.00 0.00 0.00 0.00 85.39

Average: all 10.58 0.00 0.58 1.96 0.00 0.01 0.00 0.00 0.00 86.87

According to iostat and mpstat, I can't figuer out where is the limitation for bulk performance.

As you can see, CPU idle>80%, disk io just 13MB, RAM just use 2G-3G I can see from Kibana

Have you followed the instructions in the documentation, especially around increasing the refresh_interval? Do you have X-Pack monitoring installed?

yes, but I can't increasing the refresh_interval because I need search in real time. And I installed X-Pack monitoring, so I can see the index rate in Kibana.

I already disabled swap and set indices.memory.index_buffer_size: "1g"

What indexing throughout are you seeing? What indexing throughput do you need to achieve? How much data are you indexing each day and how long are you keeping it?


I don't know how much data each day, it's based on web site views. And I need to keep it at least for 1 month

Where is the data coming from?

How many log entries are you generating per second?

What does your ingest process look like?

How come you are not using time-based indices?

The screenshot I upload not use web log, I use Python to make source data insert into ElasticSearch. So we need not to care about data from.
Here is the my code to test bulk, anything wrong?

#!/usr/bin/env python2.7
# -*- coding:utf-8 -*-

import time
import traceback
import multiprocessing
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

def es_data():
    for i in xrange(1000):
        try:
            yield {
                "_index": "protocol",
                "_type": "http",
                "_source": {
                    "protocol": "http",
                    "uid": "C3ttfj3cobPd3RtZHl",
                    "status_code": 200,
                    "sensor_ip": "10.0.*.*",
                    "city": "",
                    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/604.1.38 (KHTML, like Gecko) Version/11.0 Safari/604.1.38",
                    "src_ip": "10.0.*.*",
                    "method": "GET",
                    "es_type": "http",
                    "timestamp": int(time.time() * 1000),
                    "URI": "/editor_styles/styles?inajax=1&page=2&filter=.tagtpl-226&sort=undefined",
                    "host": "www.test.com",
                    "src_port": "54753",
                    "response": {
                        "body":  body,
                        "header": {
                            "Access-control-allow-methods": "GET,POST,OPTIONS",
                            "End": "Y8",
                            "X-powered-by": "PHP/7.0.7",
                            "Transfer-encoding": "chunked",
                            "Set-cookie": "SERVERID=393aa09ade25525598196ea9e53fbf93|1508152416|1508151243;Path=/",
                            "Content-encoding": "gzip",
                            "Vary": "Accept-Encoding",
                            "Connection": "keep-alive",
                            "Date": "Mon, 16 Oct 2017 11:13:37 GMT",
                            "Access-control-allow-origin": "*",
                            "Access-control-allow-headers": "X-Requested-With",
                            "Content-type": "text/html; charset=UTF-8"
                        },
                        "response_line": "HTTP/1.1 200 OK"
                    },
                    "url": "www.test.com/editor_styles/styles?inajax=1&page=2&filter=.tagtpl-226&sort=undefined",
                    "country": "中国",
                    "request": {
                        "username": "",
                        "body": "",
                        "host": "www.test.com",
                        "request_line": "GET /editor_styles/styles?inajax=1&page=2&filter=.tagtpl-226&sort=undefined HTTP/1.1",
                        "header": {
                            "Accept-language": "zh-cn",
                            "Accept-encoding": "gzip, deflate",
                            "Connection": "keep-alive",
                            "Accept": "text/html, */*; q=0.01",
                            "User-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/604.1.38 (KHTML, like Gecko) Version/11.0 Safari/604.1.38",
                            "Dnt": "1",
                            "Host": "www.test.com",
                            "X-requested-with": "XMLHttpRequest",
                            "Referer": "http://www.test.com/"
                        },
                        "body_len": 0,
                        "password": "",
                        "range_request": False,
                        "method": "GET"
                    },
                    "dst_port": "80",
                    "dst_ip": "120.*.*.*"
                }
            }
        except:
            print traceback.print_exc()


def write():
    while True:
        es = Elasticsearch('127.0.0.1:9200')
        start = time.clock()
        success, _ = bulk(es, es_data())
        end = time.clock()
        print end - start


def do_work():
    for i in xrange(50):
        process = multiprocessing.Process(target=write())
        process.start()

if __name__ == '__main__':
    do_work()

OK, so you are running a simple benchmark and do not yet have any real data?

About the benchmarking script; are you actually creating a new instance of the client for each bulk request? That is quite inefficient. You should generally create the client at the start and then reuse it, so that you can benefit from persistent connections.

If you just want to benchmark, and make sure you do so efficiently, you can create a large data file with a single document in JSON format per line and then use e.g. our benchmarking tool Rally to run it against the cluster. It may be even more interesting to use what you will be using in production, e.g. Filebeat and/or Logstash.

Yes, my fault, I made a mistake. I call the function in Process, that's the problem.

Now the bulk performance has been improved, and I'm going to use Rally to benchmark ElasticSearch, I think it will helpful to me.

Thanks a lot to give me advise to solve my problem. If I have other problems, maybe I will bother you again, so thanks again!

Hi Christian,
I fix my bug and retest it. Now I can see the index rate from Kibana with x-pack plugin is 3000-4000 documents per second.

The bulk rate is low to me yet. And I wanna improve it at least to 10000 documents per second.

Any advice to me?

Have you tried increasing the indexing thread count? Did you try using Rally to make sure you get similar results that way and it is not your benchmarking script that is the bottleneck?

Yes, I increased thread count from 20 to 50, but the index rate has not been improved, still 3000-4000 documents per second.

Here is a part of Rally report :
|------:|-------------------------------:|-----------------------:|------------:|-------:expressionless:
| All | Indexing time | | 43.8547 | min |
| All | Merge time | | 9.91193 | min |
| All | Refresh time | | 4.07887 | min |
| All | Flush time | | 2.47258 | min |
| All | Merge throttle time | | 2.05618 | min |
| All | Median CPU usage | | 450.5 | % |
| All | Total Young Gen GC | | 258.91 | s |
| All | Total Old Gen GC | | 14.007 | s |
| All | Index size | | 3.28999 | GB |
| All | Totally written | | 20.149 | GB |
| All | Heap used for segments | | 18.5363 | MB |
| All | Heap used for doc values | | 0.104549 | MB |
| All | Heap used for terms | | 17.32 | MB |
| All | Heap used for norms | | 0.0740967 | MB |
| All | Heap used for points | | 0.222055 | MB |
| All | Heap used for stored fields | | 0.815544 | MB |
| All | Segment count | | 97 | |
| All | Min Throughput | index-append | 26787.3 | docs/s |
| All | Median Throughput | index-append | 27315.4 | docs/s |
| All | Max Throughput | index-append | 28426 | docs/s |
| All | 50th percentile latency | index-append | 894.98 | ms |
| All | 90th percentile latency | index-append | 2498.99 | ms |
| All | 99th percentile latency | index-append | 5972.18 | ms |
| All | 99.9th percentile latency | index-append | 13505.2 | ms |
| All | 100th percentile latency | index-append | 15106.5 | ms |
| All | 50th percentile service time | index-append | 894.98 | ms |
| All | 90th percentile service time | index-append | 2498.99 | ms |
| All | 99th percentile service time | index-append | 5972.18 | ms |
| All | 99.9th percentile service time | index-append | 13505.2 | ms |
| All | 100th percentile service time | index-append | 15106.5 | ms |
| All | error rate | index-append | 0 | % |

The report seems to indicate a higher throughput. Is this benchmark not based on your data?

I don't know whether this benchmark based on my data, I just run command esrally --distribution-version=5.6.2

And I found my benchmark script occur an timeout error sometime, but I connect ElasticSearch using 127.0.0.1:9200 socket, why this error occur?

If you have created a custom track, it is not based on your data.

If you run Rally that way it will run the geonames track. This indexes documents with the following structure:

{
  "geonameid": 2986043,
  "name": "Pic de Font Blanca",
  "asciiname": "Pic de Font Blanca",
  "alternatenames": "Pic de Font Blanca,Pic du Port",
  "feature_class": "T",
  "feature_code": "PK",
  "country_code": "AD",
  "admin1_code": "00",
  "population": 0,
  "dem": "2860",
  "timezone": "Europe/Andorra",
  "location": [1.53335, 42.64991]}

This is a lot smaller than your documents and not really comparable. Smaller documents are expected to be faster to index. The track also by default only uses 8 clients for indexing. You could however potentially modify your script to index records(s) of this type and update Rally to use the same number of threads your script uses. That would allow you to compare the two and see if they are comparable.

You could also create a custom track, but that would probably require more work.