Huge concurrent data ingestion to ElasticSearch

Hi Guys, I encountered an issue in ElasticSearch. We use Logstash listen a TCP connection, then filter and output. We only have a few dissect filters in Logstash to improve Logstash processing speed.
Our logstash settings is

pipeline.batch.size: 2500
pipeline.batch.delay: 5
pipeline.workers: 24
pipeline.output.workers: 24
config.reload.automatic: true
config.reload.interval: 100s

We have a 32 Cores CPU and 96GB mem in LS server.

In order to relieve Logstash pressure, we move almost parsing code to ingest node of ES. In that ingest node, we have some processors to do some calculation and regular expression to our data. Meanwhile there is another master/data node combining a ES cluster. The ingest node is only used for ingest, disabling data/master node. The data node is in the contrary.
Ingest node has 4G JVM, Data node has 16G JVM. They are in different servers. Both of servers have 96G mem. The refresh_interval is 180 seconds for each indices.
Now the issue is that we want to send 700K events in TCP connection in one minutes, after several intervals, 8 or 9, ElasticSearch is not able to receive data any more, I queries in _cat/indices, the disk storage would not change. and data generator (another app, not logstash input plugin)will crash. However I can not see any errors in both ES and LS logs. Does anyone have ideas to improve ingestion behavior for ES and LS? I much appreciate your help!

Do you have monitoring installed for Elasticsearch? What is the full specification of the Elasticsearch data node(s), e.g. CPU cores and type of storage? How many indices/shards are you actively indexing into? What is the average size of your documents? Is there anything else running on the hosts?

Hi Christian, thanks for your reply.

  • We do have monitoring tools like ElasticHQ and cerebro installed in our server. However I can hardly find out the root cause, all metrics looked good for me..
  • We have two servers, one has Logstash, Kibana and Ingset node, another has data node only. Both of servers have 96GB memories and 32 cores. The storage is normal hard disk, no RAID I guess.
  • there is only one index being ingested with 5 shards. This index has less than 20 fields in each document. See below, you may know about doc size.
elastic@node011:~/elasticsearch-6.1.0$ curl node012:9200/_cat/indices
yellow open .elastichq                               DZsFO0BUSNOckwp8Bg6T1g 5 1      1 0     7kb     7kb
yellow open .kibana                                  O4fgIRL-SiKSlWSPR9vmQQ 1 1    325 5 276.6kb 276.6kb
yellow open rawType-host15-20180730 CuM6rCtmSlCRZQpwQoNaiA 5 1 485733 0 315.7mb 315.7mb
yellow open rawType-host15-20180731 28TF8H_6Si6nrwUlX7R_pw 5 1 115201 0  72.3mb  72.3mb

I opened debug in Logstash, it seems Logstash is still able to receive data from TCP connection. however index size of ES would not increase any more.

What does iostat give while you are indexing? Anything in the logs? Are your mappings static?

I've not had a chance to do that, I'll try it tomorrow. (iostat)
But in basic monitoring, CPU and Mem are OK (top & free & vmstat)
There is no logs in both ES and LS when message level is info. and we used template to make sure no dynamic mapping, avoiding "convert" in Logstash. By the way, I add "pool_max" in ES output plugin and resize it to 10000. I don't know whether it would improve performance or not. Do you need other details? thanks in advance.

That will allow Elasticsearch to queue up more data, which will use more heap, but is unlikely to speed up ingestion. See this blog post for details and a discussion.

OK, Got it. But previously I just kept it as default, still failed to ingest data to data node before indexing. Do you think we should put ingest node and data node in the same server to save cost of network communication?

I dissected your link, it mentioned "rejected events". When we have some heavy queries using HighLevelRestClient, I saw some "rejected events" indeed in ES log and LS log, indicating _bulk API was full and some requests haven been denied. But in this case, I see nothing in my log and there is no queuing events in ElasticHQ dashboard.

@Christian_Dahlqvist as you suggested. today we have a better monitoring in Elastic 6.3.0 (previous 6.1.0). I set both ES and LS 32GB memory. ES has just a little workload however Logstash used up its memory in a few minutes, about 10m. And our application threw an exception, data did not increase any more. Do you have other ideas?

What does your Logstash config look like? Logstash should not use that much heap even if you have a large batch size and a lot of worker threads. I would recommend enabling debug logging in Logstash to see if anything surfaces.

We have nearly 20 config files in one folder. Each filter plugin has a dissect filter to enrich "plain text" to JSON. Previously we used "csv filter" and it seemed it had bad performance than "dissect". Generally, we have a "split" filter for splitting new line, "mutate" to convert some fields lower case and "date" for timestamp matching. Here is a sample:

filter {
    split {
        field => "message"
    } 
    mutate {
        lowercase => [ "host", "sourceType" ]
        add_field => {
            "SYSPLEX_SYSTEM" => "%{sysplexName}.%{systemName}"
        }
    }
}

filter {
    if [sourceType] != "syslog-console" {
        date {
            match => ["TIMESTAMP", "yyyy-MM-dd HH:mm:ss.SSSSSS"]
            timezone => "%{timeZone}"
            target => "@timestamp"
        }
    }
}

filter {
    if [sourceType] == "special_type"  {
        if [message] {
            dissect {
                mapping => {
                    "message" => "%{},%{},%{TIMESTAMP},%{MVS_SYSTEM_ID},%{DB2_SYSTEM_ID},%{PRIMARY_AUTH_ID},%{END_USER_USERID},%{END_USER_TRAN},%{END_USER_WORKSTN},%{SSID},%{AUTHID},%{CORRELATIONID},%{PLAN},%{LOCK_LATCH_SEC},%{LOCK_LATCH_EVENTS},%{IRLM_LOCKLATCH_SEC},%{DB2_LATCH_SEC},%{ELAPSED_SEC_CL1},%{CP_CPU_SEC_CL1},%{COMMIT_COUNT},%{ABORT_COUNT},%{SQLCALL_SEC_INSP},%{UDF_REQUESTS_SEC},%{IO_WAIT_SEC},%{BP4K_GETPAGE},%{BP32_GETPAGE},%{BP8K_GETPAGE},%{BP16_GETPAGE}"
                }
            }
            mutate {
                add_field => { "tags" => "pipeline" }
            }
        }
        else {
            mutate {
                add_field => { "tags" => "no_message" }
            }
        }
    }
}

I also opened debug to print a large nohup log. Not many useful info. Our scenario is sending plenty of data in a short time with TCP connection, and continuing on in 1 minute interval. Each interval may have 700K events. The data size is 0.6K per one event in Linux system by index storage size / doc count, the single JSON files is 4K in filesystem which copy from Kibana document.

@Christian_Dahlqvist by the way, I tried to resize batch size * workers in Logstash at several times from 8000 to 36000 in a single flight. It seems no impact to performance. Logstash was always busy.

I was suggesting decreasing it rather than increasing it. Larger values does necessarily not always give better performance.

@Christian_Dahlqvist Thanks for your reply, as we tested, the bottle neck is output to ES. If we just change output to a file or stdout, then it works. Any ideas to optimize ES bulk? I can not use flush_size in LS 6.0 and above.

Can we still have a discussion here or post a new topic :smiley:

You will need to look at the Elasticsearch cluster and try to find what is limiting performance. Is it CPU or memory on the ingest node? Is it CPU or perhaps disk I/O on the data nodes?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.