Hi,
I'm building an almost real-time data pipeline with redis, logstash and elasticsearch.
I need to store 2 million documents every minute, but it is not easy for me.
So I want to get some advice on my structure.
The structure is divided into 4 stages.
-
Python code. Send HTTP API requests to multiple sites. Then get and format data.
I used django-celery and rabbitmq.
This stage never became bottleneck during several tests. -
1's servers send the formatted data to Redis.
The number of documents is more than 2 million/min (~30000/sec) -
Logstash sends data in Redis to Elasticsearch
Redis and Logstash is installed in same server(aws ec2 instance).
I've run the test for several times under different settings, but it is slower than I expected and the documents are eventually stacked in Redis server.
I guess logstash becomes the bottleneck when I increased the number of elasticsearch node.
The write queue was almost empty during the test, while it wasn't when I use only 1 node.
When I used only 1 node, the write queue was always full and es_rejected_execution_exception is occured frequently.
Is there any recommendable way to increase the process rate, except increasing the spec of each server?
---What I've done for test---
[ES] 8GB 1 node + [AWS] m5.large (ram 8GB)
- almost 2500 documents / sec
[ES] 58GB 2 node + [AWS] m5.large (ram 8GB) - almost 5000 documents / sec
[ES] 58GB 2 node + [AWS] m5.xlarge (ram 16GB) - almost 10000 documents / sec
[ES] 58GB 3 node + [AWS] m5.xlarge (ram 16GB) - almost 15000 documents / sec
[ES] 58GB 3 node + [AWS] m5.2xlarge (ram 32GB) - almost 20000 documents / sec
---logstash config
input {
redis {
host => "localhost"
port => 6379
codec => json {}
data_type => "list"
key => "logstash"
password => "${REDIS_PWD}"
}
}
filter {
mutate {
rename => ["_index", "[@metadata][_index]"]
rename => ["_id", "[@metadata][_id]"]
}
json {
source => "_document"
}
date {
match => ["timestamp", "ISO8601", "UNIX_MS", "UNIX" ]
target => "timestamp"
}
mutate {
remove_field => ["_document", "@timestamp"]
}
}
output {
elasticsearch {
hosts => [<myhost>]
user => "elastic"
password => <mypassword>
index => "%{[@metadata][_index]}"
document_id => "%{[@metadata][_id]}"
}
}