Redis - Logstash - Elasticsearch is not as fast as I expected

Hi,

I'm building an almost real-time data pipeline with redis, logstash and elasticsearch.
I need to store 2 million documents every minute, but it is not easy for me.
So I want to get some advice on my structure.

The structure is divided into 4 stages.

  1. Python code. Send HTTP API requests to multiple sites. Then get and format data.
    I used django-celery and rabbitmq.
    This stage never became bottleneck during several tests.

  2. 1's servers send the formatted data to Redis.
    The number of documents is more than 2 million/min (~30000/sec)

  3. Logstash sends data in Redis to Elasticsearch

Redis and Logstash is installed in same server(aws ec2 instance).

I've run the test for several times under different settings, but it is slower than I expected and the documents are eventually stacked in Redis server.

I guess logstash becomes the bottleneck when I increased the number of elasticsearch node.
The write queue was almost empty during the test, while it wasn't when I use only 1 node.
When I used only 1 node, the write queue was always full and es_rejected_execution_exception is occured frequently.

Is there any recommendable way to increase the process rate, except increasing the spec of each server?

---What I've done for test---
[ES] 8GB 1 node + [AWS] m5.large (ram 8GB)

  • almost 2500 documents / sec
    [ES] 58GB 2 node + [AWS] m5.large (ram 8GB)
  • almost 5000 documents / sec
    [ES] 58GB 2 node + [AWS] m5.xlarge (ram 16GB)
  • almost 10000 documents / sec
    [ES] 58GB 3 node + [AWS] m5.xlarge (ram 16GB)
  • almost 15000 documents / sec
    [ES] 58GB 3 node + [AWS] m5.2xlarge (ram 32GB)
  • almost 20000 documents / sec

---logstash config

input {
    redis {
        host => "localhost"
        port => 6379
        codec => json {}
        data_type => "list"
        key => "logstash"
        password => "${REDIS_PWD}"
    }
}

filter {
    mutate {
        rename => ["_index", "[@metadata][_index]"]
        rename => ["_id", "[@metadata][_id]"]
    }
    json {
        source => "_document"
    }
    date {
        match => ["timestamp", "ISO8601", "UNIX_MS", "UNIX" ]
        target => "timestamp"
    }
    mutate {
        remove_field => ["_document", "@timestamp"]
    }
}

output {
    elasticsearch {
        hosts => [<myhost>]
        user => "elastic"
        password => <mypassword>
        index => "%{[@metadata][_index]}"
        document_id => "%{[@metadata][_id]}"
    }
}

Elasticsearch is often limited by disk I/O. What type and size of EBS are you using for the different scenarios?

What does EBS mean? Is it the storage of aws ec2?
I forgot to say I'm using elastic cloud for elasticsearch and kibana, not on ec2 server.
The type of elasticsearch is aws.data.highio.i3.

Elastic Cloud uses local SSD storage with very good I/O so it is less likely that is the bottleneck. As you are specifying document_id for the output, Elasticsearch will need to check if the document already exists before creating the document, which is significantly slower than letting Elasticsearch automatically assign the ID.

Have you enabled monitoring for your Elasticsearch cluster? Have you enabled monitoring for Logstash? What is the size of your documents? How many indices and shards are you writing into? Have you optimized your mappings or are these dynamic?

Thanks for your reply, first.

In case of document id, I really want to use automatically assigned ID.
But when I receive documents from api requests, some documents overlap with the last request.
So I intended to get rid of these duplicate documents through assigning the ID.
I thought it would be faster than finding and judging if the same document already existed.
Please let me know if I'm wrong or if there is a better way.

I enabled monitoring for both of Elasticsearch cluster and logstash to find out where is bottleneck.
The size of each document varies widely, from 100~20k bytes.
I'm writing into total 45 indices, and each has 1 shards and 1 replicas.
(I've never change the shard and replicas settings, so maybe it's default I think..)
Also I'm not using dynamic mappings. I made some templates for the indices, and the type of each field is explicitly set.

While reading the link you gave, I thought maybe my mappings are not fully optimized.
I'll check it and modify according to the link, later.
But before that, let me know if there is another way based on the new information I wrote.

--- my templates ---

{
  "<template1>" : {
    "order" : 0,
    "index_patterns" : [
      "<type1>-*"
    ],
    "settings" : { },
    "mappings" : {
      "_size" : {
        "enabled" : true
      },
      "properties" : {
        "volume" : {
          "type" : "float"
        },
        "symbol" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "ignore_above" : 256,
              "type" : "keyword"
            }
          }
        },
        "high" : {
          "type" : "float"
        },
        "low" : {
          "type" : "float"
        },
        "exchange" : {
          "type" : "keyword"
        },
        "type" : {
          "type" : "keyword"
        },
        "close" : {
          "type" : "float"
        },
        "open" : {
          "type" : "float"
        },
        "timestamp" : {
          "type" : "date"
        }
      }
    },
    "aliases" : { }
  },
  "<template2>" : {
    "order" : 0,
    "index_patterns" : [
      "<type2>-*"
    ],
    "settings" : { },
    "mappings" : {
      "_size" : {
        "enabled" : true
      },
      "properties" : {
        "symbol" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "ignore_above" : 256,
              "type" : "keyword"
            }
          }
        },
        "asks" : {
          "properties" : {
            "quantity" : {
              "type" : "float"
            },
            "price" : {
              "type" : "float"
            }
          }
        },
        "bids" : {
          "properties" : {
            "quantity" : {
              "type" : "float"
            },
            "price" : {
              "type" : "float"
            }
          }
        },
        "exchange" : {
          "type" : "keyword"
        },
        "type" : {
          "type" : "keyword"
        },
        "timestamp" : {
          "type" : "date"
        }
      }
    },
    "aliases" : { }
  },
  "<template3>" : {
    "order" : 0,
    "index_patterns" : [
      "<type3>-*"
    ],
    "settings" : { },
    "mappings" : {
      "_size" : {
        "enabled" : true
      },
      "properties" : {
        "symbol" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "ignore_above" : 256,
              "type" : "keyword"
            }
          }
        },
        "amount" : {
          "type" : "float"
        },
        "price" : {
          "type" : "float"
        },
        "exchange" : {
          "type" : "keyword"
        },
        "type" : {
          "type" : "keyword"
        },
        "timestamp" : {
          "type" : "date"
        }
      }
    },
    "aliases" : { }
  }
}

Try increasing the internal Logstash batch size a bit and see if this helps. Do so slowly as you do not want your requests to get too large. Using external document id is the best way to avoid duplicates but dues add some overhead. Large documents also take longer to parse and process so always need to be considered when judging whether a certain throughput level is good or not.

Thanks for your reply.

Okay, then I'll try increasing Logstash batch size, and some tuning works.
After several tests, I'll append my results here.

Thank you so much!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.