Advice on Scaling writes

Hi,

Our systems are generating CSV files containing all events that happened during the past hour.
There are about 15.000 CSV files generated every hour, containing thousand of events each.
A quick estimate would mean having about 70,000 events captured every second in these files.

Each line of each csv contains one event, organized around a unique ID, a CSV ID, a timestamp, a free text field, then a few numerics.
The 2 main use cases would be to make a full text search on the free text field, or to make aggregations on the numeric fields given time buckets.

The data would ultimately reach about 5-6Tb once reaching the max retention.

The idea would be to use filebeat to send data to logstash and index everything in ES.

Being new to ES I tried creating one node and look at the ingestion rate. I barely reach 4k e/s and it eats all the 4 cores of the test machine.

I understand that increasing the number of logstash pipelines will help scale up my writes. Correct?

Then increasing the amount of ES nodes might also improve the situation here, however it will multiply my hot storage needs, correct?

Finally adding more CPU cores will also help scaling my writes as currently CPU is bottleneck on test machine.

If I am correct in the previous assumptions, is there any advice you could give me on sizing that? Is there any other setup that would help scaling my writes?

I'm also thinking of using rollover to limit each index size to 200Gb as I've read performance will degrade after that (shards above 20-25Gb).

Looking for any advice, thanks for your help.

How about trying to remove components first, and then caring about scaling out? Your use-case might not need logstash, as over time some helpers have been added for certain use-cases, and yours might just fit that.

First, what are you using logstash for? If it is just for CSV processing, your could either see if filebeat csv decoding can do that, or if Elasticsearch CSV Processor can do that.

The latter allows you to do a set of common enrichment steps right in Elasticsearch and probably allows you to remove logstash as a required component. Also, you can scale out nodes doing ingestions independently from data nodes in Elasticsearch.

Regarding scaling writes: Adding more cores always helps, as thread pools are sized based on the number of cores. Also adding more primaries might help to spread the load of writing across several nodes (#primaries >= #nodes).

Hope this helps as a start. Otherwise feel free to add some more information what you are doing with logstash.

1 Like

Thanks for the fast response Alexander!

I'm only doing ingestion, no transform or filtering of any kind (this is done already earlier in the process before generating the CSVs), so the standard ingest pipelines CSV processor would totally work.
However the back pressure feature from persistent queues is a great advantage for me to handle peaks.

I've seen that you can set up roles to nodes, like ingest indeed, but would it mean that these nodes would be only processing, not storing data? and that all ingested data would only be stored on the master or data nodes?

Also all this should ideally happen on the same machine (32 cores 256Gb RAM), so maybe logstash makes more sense by scaling the logstash nodes instead of the ES nodes?

One more thing, we will modify our output from CSV to ECS logging so it handles nested objects, that might ease the compatibility as well

You should be able to achieve a similar form of back pressure with filebeat as well, as Elasticsearch returns an error if it is overloaded, and filebeat just backing off for a bit.

If everything is supposed to happen on a single machine, I think it makes even more sense to reduce the number of processes involved to have more resources for indexing. Also persistent queues are less needed on the same system IMO.

1 Like

ok thanks for the advice. I've configured a direct pipeline filebeat to ES (Kibana + yml file).
I've dumped around 400Mb of csv files in a folder to test and started filebeat.
My indexing rate is quite lame, less than 4k/s


Just one node on a single machine, 4vCPU, not even used 100%.... I really don't get where is my bottleneck

Indexing into Elasticsearch is I/O intensive and often limited by disk performance rather than CPU. What type of storage are you using? I would also recommend reviewing this section of the documentation and make sure you follow the recommendations.

1 Like

I'm on the standard SSD on AWS

Netdata shows the following:

IOwait shows a diskwrite at peaks of 40Mb / s .... ssd should definitely handle that

If you run iostat -x, what does iowait look like while you are indexing?

1 Like


here you go

Hi @Baygon

First @Christian_Dahlqvist is a master at this stuff, so I am glad you have he @spinscale helping you.

I have a bit of experience scaling ingest, especially with Filebeat, 70K EPS sustained is non-trivial but certainly doable...

First I would think of Filebeat as a "Thin Pipe" and Logstash as a "Thick Pipe" both have their uses.

Logstash vs Filebeat ... lets have that discussion later you can make either work (for this use case logstash with multiple pipelines / workers might make more sense but I am going to stick to Filebeat for right now)

You will not achieve 70K EPS with a single Filebeat, I doubt you were expecting that but just in case.

< insert long history and discussion here... filebeat was originally meant to be an edge shipper, now it seems to used for much more than that etc... etc... / >

Currently Filebeat is single threaded on the input side (Many of us are hoping that will change at some point)

I appreciate you are trying to do some benchmarking to come up with "Scaling Units" for your components.

So at this point I am going to make a WHOLE lot of assumptions :slight_smile:
(Looks like you have good HW and Christian is having you check that)

So this is what I would do.... these are just suggestions with some starting configs / points etc.

Lets focus on Filebeat -> Elasticsearch Architecture.

I am not sure where you are doing the CSV parsing, I think you are using and ingest pipeline.

1st I would set up a Single 64GB / 8+ Cores Elasticsearch Node tuned for ingest

2nd I would set up a single Filebeat VM with 1-2 GB RAM and 1 or 2 vCPU (these might need to be adjusted a bit)

With these parameters, these are a starting point.

# Approach A : Tuning Parameters
queue.mem:
  events: 4096
  flush.min_events: 2048
  flush.timeout: 1s

output.elasticsearch:
  bulk_max_size: 200
  worker: 4
  pipeline: your-csv-pipeline

Then I would observe the throughput I suspect it should be a little better than the 4K/s but probably not a huge amount more perhaps 5-8K/s . I would also just do a quick check that all the CPU, RAM etc are fine on the filebeat VM. If the RAM or CPU are tight give it a bit more...

I would think of this as your "Filebeat Scaling Unit"

3rd Then I would horizontally scale Filebeat. I will make a wild assumption that all the files are not in the same directory or that you will have a way to split the paths.

At some point you will probably start to get back pressure from the ES Node, assuming you have tuned that for ingest see here

4th Then you will know what you ingest / sec per ES Node Is. Then you can scale the ES Nodes

5th If I was going to jump all the way to the end... and wanted 70K EPS sustained ...

I am thinking that your Cluster will have at least 3 x 64 GB ES nodes, with the the index having 3 primary shards to aid parallelism (or 4 nodes, 4 Shards etc) and you will have at least 10 Filebeats or a Beefy Logstash.

Just thoughts on a Saturday morning...

2 Likes

Hi Stephen, and wow, what an answer for a saturday morning.

I will work on trying that tomorrow (here is nearly midnight), but just a bit of comment.
I didn't notice any single threading during the ingestion, with a load well spread around the 4 cores for the test, but might be the ES overhead, ok with that.

I am indeed using an ingest pipeline-

[
  {
    "csv": {
      "field": "message",
      "target_fields": [
        "column1",
        "column2",
        "column3",
        "column4",
        "column5",
        "column6",
        "column7",
        "column8",
        "column9"
      ],
      "ignore_missing": false
    }
  },
  {
    "date": {
      "field": "column5",
      "formats": [
        "ISO8601"
      ]
    }
  },
  {
    "convert": {
      "field": "column1",
      "type": "long",
      "ignore_missing": true
    }
  },
  {
    "convert": {
      "field": "column2",
      "type": "long",
      "ignore_missing": true
    }
  },
  {
    "remove": {
      "field": "message"
    }
  }
]

I'm not fully sure filebeat is actually using it properly since a lot of logs are ending in the filebeat index instead of the test index.
Confg yml:


- type: log
  enabled: true
  paths:
    - /opt/logs/*.csv

Output:

# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
  # Array of hosts to connect to.
  hosts: ["localhost:9200"]
  indices: 
    - index: "test"
  pipeline: test-pipeline

I'll definitely try your config option, especialy the worker option... I guess that wil help a lot.

Files are all in same directory, however they will be fed by a queue manager, so I have flexibility on spreading them over multiple directory (guessing it would be a multiple of the nodes to pop).

Given my limited xp with ES, I'd like to understand more about my ingest nodes. Are they holding the whole data repl. or are they only gateways to the data/master nodes?

If I can avoid having 3x64gb nodes and instead have 1 big master (probably 16cpu 128Gb RAM) and 2 small ingest nodes taht would be much better for me.

The end architecture would have one big machine ingesting raw data and processing it (very cpu intensive), then the ES ingest would take place. Reads are quite limited

Real quick lets get some terms clarified.

If you are not defining node roles all your nodes will be Master, Data, Ingest etc (all roles) which is fine for your initial benchmarking and even production for your cluster but I would review the roles here.

All node with all role nodes are fine at this point.... but here are a few things to think about.

Master Nodes : Only keep track of the cluster state, they do not read / write date and read / writes should not be pointing at those nodes. Typically in a production cluster they are separated out and are small only 1-4GB RAM

Data Nodes : Where the writes / reads actually happen these are your "Big Nodes" 64GB Nodes they need RAM / CPU / IO

Ingest Nodes : Where ingest processors run before writing occurs. Think of these are data processing / ETL nodes. They are typically 8GB RAM, 2-4 CPU etc. These nodes DO NOT Write.

In general More Properly sized Data nodes are better than 1 big node.

3x64s Nodes or even 2 x 64GB is much better than 1 big data node... you will get more JVM Heap with 2 or 3 nodes with that is better than 1 node. JVM heap is a big part of ES performance.

Elasticsearch is a distributed data store, it is built to scale horizontally... vertical scaling beyond 64GB has limited benefits / returns.

I would strongly recommend (and I can only recommend) the 3 x 64GB configuration for your data nodes not 1 big node.... but start with 1 64GB node and see where you get.

Me my "Beautiful" Cluster would like

  • 3 x 1-2 GB Masters
  • 3 x 64GB Data Node
  • 3 x 8GB Ingest / Coordinator Nodes. I would make sure my writes were load balanced to the 3 ingest nodes.
  • Index would have 3 primary shards.

But just the simple
3 x 64GB All Node Roles Cluster is certainly a solid configuration with the Index would have 3 primary shards is a solid configuration

To clarify I was talking about inside the filebeat input processing is single threaded... (I believe)

1 Like

ok some update from today's tests.

I think I can rule out filebeat to be the bottleneck. Monitoring filebeat logs in realtime was showing me that the load on ES continues far longer after harvesting of files was completed. So the bottleneck is definitely the indexing part.

I've popped up a small cluster to test:
1x master, data with 4Gb heap
1x data with 3Gb heap
2x ingest with 2Gb heap

I load balanced the 2 ingest nodes with nginx (round robin).
My writes are up to 5k/s (which is 1.5k/s higher than previous tests). Good! Thanks for helping me reach that already :slight_smile:

CPU does not seem to be a bottleneck, disk is not a bottleneck either, so either I miss some config (shards?) or my heap is too small.

EDIT: I also made another test with the 2 ingest nodes but only one master, data, and I was still stucked at 3.5k/s, so doubling the data node helped.

Here's the kibana overview screenshot:

And the full index monitoring:

I notice that there are only 2 shards (1 primary). Not really sure how to setup here since I have 2 data nodes. Should I put 2 primaries?

Perhaps I made things more complicated than I needed to.

But good job setting up a multi-node multi-role cluster.

First with the size nodes that you're testing right now you'd probably just been better off having one 8 GB all node roles node.

Dividing up the roles in this small of a cluster does not add a ton of value in my opinion.

Also you're thinking a bit backwards Masters are small data nodes are large your data notes should be the larger /largest node in your cluster.

Masters do very little work (but important work) data nodes do a lot of work.

Your master can be one GB until you start to get a bigger more complex cluster.

According to your graphd you're only doing about 2.5k EPS We typically talk in terms of primary shards when you have one primary charge.

You're clearly engineer and you're going to look at CPU IO disc etc and you're going to spend a lot of time doing that ...

I would set up an 8 gigabyte all in one node, with one primary and one replica and scale up my filebeats until I saturate that node.

Filebeat will probably not look like it's saturated but still will have limited throughput. It may not show as a bottleneck but it will only have limited throughput. You should be able to get a little more than you have but I can only give you advice.

You should be able to get above 5K EPS with a single properly configd filebeat.

Scale your filebeats if you want to saturate your data node or switch over to logstash.

Run 4 filebeats in parallel and see if it saturates your data node.

With enough ingest you'll be able to saturate your data node.

It seems like your bottleneck hunting which makes sense in one perspective on the other hand you're after 70K EPS none of the nodes sizes are talking about are going to support that but I guess you're just trying to get a sense of what ingest you can support with a smaller nodes

1 Like

Thanks for your response.

I will try on a bigger cluster tomorrow then. My hunt is more on what is the bottleneck that I need to scale. Is it really archi or is it some settings on my shards etc…

If I understand you properly my 5k are actually 2.5k (primaries) which means that I’m even slower than the 3.5k on the 8gb heap single node before…

I will try to have each worker fetching the data have its own file eat instance, that will help saturate the cluster. I will follow the settings above for bulk writes.

But is it really the heap space? (80% saturated from monitoring)
I’m not scaling yet because I want to test bottlenecks. My boss wouldn’t like me popping 3x 1.5usd per hour nodes just for fun)
We’re still a simple start up)

1 Like

Good Job,

Sorry It is probably not Heap Space today, but we often think of that that often becomes a limiter as your cluster grows. For what you are doing it will be more about CPU and IOPs as Christian said.

Apologies in my head I think about at 8:1 GB RAM to Core or smaller ratio so when I say 8 GB I am thinking about double the cores / IO of a 4 GB. etc... It is just the scaling units I think in.

I get the need to do testing on smaller scaling units I would use a 4 GB 1 Core or 8 GB 1 or 2 Core Scaling unit to start to see. Then it will be about the settings and ingest etc.

What type of HW are you on? Are you in AWS or something? if so you should be using i3s for High speed ingest.

1 Like

Just to echo and reinforce @stephenb's earlier comments: if neither CPU nor IO are maxed out in Elasticsearch then it's likely that the limiting factors are not on the Elasticsearch side and instead that the clients (Filebeat in this case) are just not pushing it hard enough. Do you track the size of the write threadpool queues across nodes and the indexing pressure metrics? If Elasticsearch is the bottleneck then these numbers should be consistently large and the clients should occasionally receive 429s, whereas if they're usually small and every request succeeds then the bottleneck is elsewhere. If so there's no point in scaling Elasticsearch up or out, you should be investigating what you can do to get more throughput on the client side. Typically that means increasing concurrency.

A simple three-node cluster can ingest well in excess of 70k docs per sec (depending on the docs ofc) but to achieve this kind of throughput requires a very well-designed client setup.

3 Likes

Ok, I'll try that then.
It will take a few days to refactor the code to have a filebeat instance on each container producing the data.
Then I'll try to pop up a bigger data node, let's say 32GB (16 Heap). Will update when this is done.

Thanks everyone for the great advices!

I ran more tests this afternoon and I managed to reach 15k/s with only one data node with 10Gb Heap.
Only one filebeat instance running as well....
What did the miracle would you ask? More CPU!

When running previous tests the data node had access to 4 cores, today it had 8 cores. This is then clearly the bottleneck.
I think what got me confused is that on 4 cores, CPU was averaging around 70-80%.
In my previous ES cluster the ingest was done through logstash, which was systematically maxing out the CPU. Seems like ingest with filebeat and ingest pipeline is not behaving the same, ie maxing out all cores, but CPU was still the bottleneck!

So 2 good 16 cores data nodes with 64Gb each + 1 master + 2 ingest nodes, and several filebeat running in parallel should totally do the trick!

I'm gonna close this thread since I got the expected result thanks to your help, but would you please clarify one last thing for me.
How do you calculate how many primaries you should have?