Using Logstash as a bulk index buffer to ES

Hi,

I'm trying to use Logstash as a buffering tool to spread large spikes of bulk requests across time, so they don't crash my ES cluster. The idea is to consume documents from a Redis queue and upload them directly to ES after a short wait of 5s or 2000 items. I've experimented on various parameters, but the end result is extremely slow. Logstash typically process less than 100 items per seconds, while I'm accumulating about twice that rate.
My conf goes like this:

input {
  redis {
    key => "es_docs"
    data_type => "list"
    codec => "json"
    # threads => 4
    # batch_count => 125
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "%{[index]}"
    document_type => "%{[doc_type]}"
    document_id => "%{[id]}"
    # flush_size => 2000
    # idle_flush_time => 5
  }
  stdout { codec => "dots" }
}

With -b 1000 and -w 4 cli parameters (4GB memory for the JVM).

I should also add that our documents are quite big and average around a few kb.
Is Logstash a good tool for what I want to achieve ? What Am I doing wrong ?

Thank you for your time.

1 Like

How are they crashing the cluster?

Too much requests at the same time, I guess. The cluster was OOMing each time I detected a spike in indexing requests, leaving all nodes unresponsive [1].

That's why I'm trying to spread the load with Logstash.

[1] Related discussion:
https://discuss.elastic.co/t/repetitive-cluster-crashes-due-to-high-memory-pressure/67378?source_topic_id=68090

It looks like we are trying to work around a symptom, so I would recommend starting with taking a step back. Can you tell us a bit more about your use case? What type of data are you indexing?

I can see that you set index name and type dynamically. How many indices and types do you have? How often are new types created or mappings changed? How many shards do you have in the cluster? What is the average shard size?

You are also explicitly setting the document_id. Is this because you have a requirement to be able to update documents? If so, how frequently do you perform updates?

Sure. I am indexing JSON documents representing different compounded database models. For example (made up): we combine a cooking recipe with its ingredients (one to many relationship) into a single document. Some of them are large and their size will typically range from 20 to 100 Kb (JSON).

I have 4 indices and 1 type per index. I don't create new types, and I have a "fixed" mapping (not all fields are defined, but the document structure don't change over time. When it does I reindex everything).
I have one primary shard per index + 1 replica for 2 nodes in the cluster (3 in production) and an average index(/shard) size between 200-500 MB. I increased the number of primary shards for indices over 1GB to 5. However, I found that increasing the number of single shards to 2-4 would yield encouraging indexing performance results.

I am setting explicitly the document id to the one I have in database, because I want to update some of the documents (at most 400/min), GET specific documents, and be able to see potential disparities between my ES index and the database table where they originally sit.

I'm conscious that I might be treating a symptom, but I had memory issues for a while in my ES cluster. Some of the tests I've made (increasing bulk size) would confirm that it was a matter of throttling request spikes. It seems a better idea than just upgrading my cluster (which is already very expensive, and I can't do that forever anyway).

Thank you for your time.

What indexing rate do you need to support? What is the size of the cluster? What does the query load look like?

I have one production cluster of 3 nodes (8GB Mem) working fine, with an average indexing rate of 50 req/s (spiking shortly to 1000/s at regular intervals) and a search rate of 20/s during the day.

It looks like this:

However, I have another staging cluster of 2 nodes (16 GB Mem) barely keeping up with a similar indexing rate and a search rate of 0.4/s. Since, I've allocated only one primary shard to most of the indices, I wouldn't expect a lot of differences.

As a side note, I'm creating new documents at a lower rate, with peaks at 500/minute, as I'm sending indexing requests with a specific id that may already exist in ES (because previously indexed/updated).

So the production cluster has 3 8GB nodes and the staging cluster has 2 8GB nodes? What does the equivalent graphs look like for the staging cluster?

The staging cluster has 2 larger 16GB nodes. I just upgraded it to 3 16GB nodes this morning, because I needed it to be usable in order to test a new feature. Unfortunately, it didn't change anything and nodes are still crashing.

The graph is not pretty, areas where it reaches 0 are just missing data due to one or multiple nodes crashing/OOMing:

To get a better view of what's going on, I used another tool:


In purple, the number of bulk request queued on one node (0 == missing data)
In blue, the total number of indexing operations executed on the cluster (the huge spike is suspicious, it was right after a recovery)
In red, the average Old gen GC Time per node (one of them is not doing very well)

If the larger nodes are having problems and the smaller ones are running fine under what looks like heavier load, there must surely be something that is different. Do they hold the same amount of data and number of documents? Are the number of indices and shards the same? Are the mappings for all indices the same? Are there any differences in load or type of queries/aggregations?

The data are not well spread, the faulty node is currently holding less documents than their peers. Number of shards are similar (around 45 on each node). The mappings are almost entirely different between each indices.

However, I've identified that 2 heavy indices were quite unbalanced and had more than half their primary shards on that failing node. I've triggered a manual shard reallocation to see (without much convictions) if the load is better spread.

EDIT: Well, no, the load is not spread evenly and it just crashed an other node.

I think I found the root cause of all my issues. I discovered a couple of +45MB documents hidden in a few indices. They should definitively not have this kind of size, I'm going to investigate.

This would explain both problems: why logstash was extremely slow and why my ES cluster was choking to death under a small/medium load. I could have found out by monitoring properly the average size of documents being sent to ES.

Thank you for asking questions, it helped getting a better idea of what is (-if confirmed-) the real issue.

EDIT: I confirm this was the real issue. Thank you for your time!

May I know your overall architecture?
Are you using redis -> logstash -> ES?
Where is your own application?

Thank you.

The architecture I was trying to build was like this:

Application 1 --|
Application 2 --|--> Redis -> Logstash -> ES
Application 3 --|

Where everything was on the same host (except ES). Since I no longer experience crashes in ES I've postponed the task, so it's not actually implemented.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.