Elasticsearch with RabbitMQ

Hi, i using Rabbitmq in my system. (two node and HA queue)
I have 2 LS-Forwarder, 2 LS-Indexer (8GBRam, 8Core) and 3 Node in ES (16GB, 16Core)

I check queue in RabbitMQ and get isssue :

Imcoming 3k events/document per second and deliver / get is 300 ~ 600 events/doc per second only.

And after 30min, i have 3M document queued in RabbitMQ.

I don't understand, why number document/events of deliver / get very small.
i try to increase thread in plugin output rabbitmq in logtash indexer up to 90 thread, performance < 50%, but it is still inefficient.

Can you help me?

Thansk

So you mean logstash elasticsearch output is slow?

Can you give us:

  • logstash config
  • elasticsearch index settings/mapping

Did you configure ES for bulk inserts? (I mean, high refresh interval, bulk thread pool...)

@ebuidy :
This is my config LS Index :

input {
    rabbitmq {
        arguments => { "x-ha-policy" => "all" }
        host => "10.1.6.244"
        queue => "logstash-queue"
        #durable => true
        key => "logstash-key"
        exchange => "logstash-rabbitmq"
        threads => 120
        exclusive => false
        prefetch_count => 512
        vhost => "ELK"
        port => 5677
        user => "logstash"
        password => "****"
    }
    rabbitmq {
        arguments => { "x-ha-policy" => "all" }
        host => "10.1.6.245"
        queue => "logstash-queue"
        #durable => true
        key => "logstash-key"
        exchange => "logstash-rabbitmq"
        threads => 120
        exclusive => false
        prefetch_count => 512
        vhost => "ELK"
        port => 5677
        user => "logstash"
        password => "*****"
    }
}
output {
  elasticsearch {
    hosts => ["10.1.6.242:9200", "10.1.6.243:9200", "10.1.6.241:9200"]
    user => "*****"
    password => "****"
    sniffing => true
    manage_template => false
    #index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
    #document_type => "%{[@metadata][type]}"
    index => "%{beatname}-%{+xxxx.ww}"
    document_type => "%{beattype}"
  }
}

And this is my config in LS-Forwarder

input {
  beats {
    port => 5044
  }
}
filter {
  mutate {
   add_field => {"beatname" => "%{[@metadata][beat]}"}
   add_field => {"beattype" => "%{[@metadata][type]}"}
  }
}
output {
    rabbitmq {
        exchange => "logstash-rabbitmq"
        exchange_type => "direct"
        key => "logstash-key"
        host => "10.1.6.244"
        vhost => "ELK"
        workers => 12
        durable => true
        persistent => true
        port => 5677
        user => "logstash"
        password => "****"

    }
    rabbitmq {
        exchange => "logstash-rabbitmq"
        exchange_type => "direct"
        key => "logstash-key"
        host => "10.1.6.245"
        vhost => "ELK"
        workers => 12
        durable => true
        persistent => true
        port => 5677
        user => "logstash"
        password => "****"

    }
}

- elasticsearch index settings/mapping

Using template Index

{
  "order": 0,
  "template": "fb-*",
  "settings": {
    "index": {
      "number_of_shards": "2",
      "number_of_replicas": "1",
      "refresh_interval": "5s"
    }
  },
  "mappings": {
    "_default_": {
      "dynamic_templates": [
        {
          "template1": {
            "mapping": {
              "ignore_above": 1024,
              "index": "not_analyzed",
              "type": "{dynamic_type}",
              "doc_values": true
            },
            "match": "*"
          }
        }
      ],
      "_all": {
        "norms": {
          "enabled": false
        },
        "enabled": true
      },
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "geoip": {
          "properties": {
            "location": {
              "type": "geo_point"
            }
          }
        },
        "offset": {
          "type": "long",
          "doc_values": "true"
        },
        "message": {
          "index": "analyzed",
          "type": "string"
        }
      }
    }
  },
  "aliases": {}
}

I set refresh_interval = 5s for all template Index

and here, my setting in my cluster (thread_pool)

"thread_pool": {
        "index": {
         "type": "fixed",
         "min": 16,
         "max": 16,
         "queue_size": 200
      }
 },