Limited indexing rate from one host

Hello, i see strange ES behavior on my test cluster. Cluster has one node and has only one index for logs that have 10 primary shards no replicas. Some tuning of this index:

"index.refresh_interval": "30s",
"index.translog.flush_threshold_size": "2gb",
"index.translog.durability": "async",
"index.number_of_replicas": "0"

I have 3 hosts with filebeat/logstash sending to this index.
When i send logs from all three hosts i can reach stable indexing rate around ~60K doc/sec.
BUT if i turn off 2 hosts and leave only one host , the indexing rate only ~20K doc/sec (i do first indexing of very big logfiles (hundreds of Gb), so rate can be much more than 60k even from one host )
Why? Where is the limit? Logs on all hosts are almost the same.
I try to split one host on 2 logstash -half of logs processed by one filebeat/logstash and other half by second filebeat/logstash, but index rate again 20K
Sounds like a network saturation, but 1Gbit network interface show <100Mbit traffic.
I'v tried different pipeline.batch.size - from 100 to 1000 - no difference.
When all 3 hosts work - i get very many 429 errors, but index rate anyway 60K. When 1 host works the number of 429 much less, but index rate small too - 20K
How to reach 60K from one host? Thanks.

As Elasticsearch is able to get to a higher indexing rate with more clients connected this does not sound like an issue with Elasticsearch. It rather sounds like it is Filebeat or the network on the hosts where they are are located that is the limiting factor. I would therefore recommend looking closer at the Filebeat hosts and potentiall open a new threads under the Beats section.

Regarding the 429 rejections I would recommend this blog post. These will cause the clients to have to resend data and can affect the idexing rate. You should be able to reduce this by reducing the number of primary shards. If you are concerned that this will lead to shards that are too large I would recommend looking into rollover and manage this through Index Lifecycle Management (ILM).

I suppose that limitig factor is logstash, when I run "top -H -p pid" I see that two threads "java" and "[main]<beats" each consume by one CPU core (99.9%)

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND                                                                                                                                                                                                                              
38851 user        20   0   44,9g  25,0g  23160 R 99,9  9,9   1:03.43 java                                                                                                                                                                                                                                 
42539 user        20   0   44,9g  25,0g  23160 R 99,9  9,9   4:38.37 [main]<beats                                                                                                                                                                                                                         
39199 user        20   0   44,9g  25,0g  23160 S  9,6  9,9   0:24.51 [main]>worker17                                                                                                                                                                                                                      
39244 user        20   0   44,9g  25,0g  23160 S  8,0  9,9   0:25.43 [main]>worker60                                                                                                                                                                                                                      
39262 user        20   0   44,9g  25,0g  23160 S  8,0  9,9   0:25.85 [main]>worker76                                                                                                                                                                                                                      
39238 user        20   0   44,9g  25,0g  23160 S  7,6  9,9   0:25.14 [main]>worker54                                                                                                                                                                                                                      
39265 user        20   0   44,9g  25,0g  23160 S  7,6  9,9   0:27.06 [main]>worker79                                                                                                                                                                                                                      
39181 user        20   0   44,9g  25,0g  23160 S  7,3  9,9   0:24.80 [main]>worker1         

How can I spread load to many cores for this threads?
I set filebeat confg to several workers and pipelining, but nothing changes

output.logstash:
  hosts: ["localhost:5045"]
  workers: 20
  compression_level: 0
  pipelining: 20

I still have one "java" thread eating 99,9% CPU and one "[main]<beats" eating 99,9% CPU

What does your Logstash config look like?

input {
    beats {
        port => "5045"
    }
}

filter {
 dissect {
    mapping => { "message" => "%{day}-%{mon} %{ts} %{loglevel} [%{comp1}] [%{comp2}] %{message}" }
  }
  if "_dissectfailure" in [tags] {
    dissect {
       remove_tag => [ "_dissectfailure" ]
       mapping => { "message" => "%{day}-%{mon} %{ts} %{loglevel} [%{comp1}] %{message} [%{comp2}] %{+message}" }
    }
  }

  mutate {
      add_field => { "hosttmp" => "%{[host][name]}" }
  }
  mutate {
      split => ["hosttmp", "."]
      add_field => { "[@metadata][shortHostname]" => "%{[hosttmp][0]}" }
 }
  mutate {
      add_field => { "[@metadata][index_prefix]" => "%{index_prefix}" }
 }

  mutate { add_field => { "[@metadata][logdate]" => "%{mon}" } }
  mutate { merge => { "[@metadata][logdate]" => "day" } }
  mutate { join => { "[@metadata][logdate]" => "_" } }

  mutate { merge => { "day" => "mon" } }
  mutate { join => { "day" => "-" } }
  mutate { merge => { "day" => "ts" } }
  mutate { join => { "day" => " " } }
  date {
    match => [ "day", "dd-MM HH:mm:ss:SSS" ]
  }
  mutate { add_field => { "compname" => "%{[log][file][path]}" } }
  mutate { gsub => [ "compname","/[^/]+$","" ] }
  mutate { gsub => [ "compname",".*/","" ] }
  mutate { lowercase => "compname" }
  mutate {
      add_field => { "comp_idx" => "%{compname}" }
  }
  mutate {
      split => ["comp_idx", "-"]
      add_field => { "[@metadata][comp_idx]" => "%{[comp_idx][0]}" }
  }
  mutate { remove_field => [ "ts", "mon" , "day" , "hosttmp", "index_prefix","comp_idx" ] }
}

output {
    elasticsearch {
        hosts => [ "elastic.host:9200" ]
        index => "%{[@metadata][index_prefix]}_%{+YYYY}_%{[@metadata][logdate]}"
    }
}