Clone cluster with Logstash

Hey Everyone,

We're trying to figure out the best way to clone a cluster completely, and Logstash seems to be the best option on a large scale as there's no need to use a custom solution.

An example configuration of what it would look like is this:

input {
    ElasticSearch {
        hosts => ["yourhost"]
        user => "**********"
        password => "*********"
        index => "*"
        size => 1000
        scroll => "1m"
        codec => "json"
        docinfo => true
    }
}
# a note in this section indicates that filter can be selected
filter {
}

output {
    ElasticSearch {
        hosts => ["yourhost"]
        user => "********"
        password => "********"
        index => "%{[@metadata][_index]}"
        document_type => "%{[@metadata][doc][_type]}"
        document_id => "%{[@metadata][doc][_id]}"
    }
} 

The cluster is large with around 2TB being ingested daily, which is why we cannot go with a snapshot/restore. There are a few questions I have regarding how Logstash handles reading from Elasticsearch:

  1. When given a wildcard as an index pattern, is there logic behind the order of reads?
    To be more precise, does he sort the indexes by name, by some metadata like creation_date, or does he read them randomly dependant on the order Elasticsearch returns them in?

  2. Does Logstash know which indices he read?
    Aka... will he read one index multiple times and try to index the same data into Elasticsearch, or does he keep a record somewhere of which ones he'd already gone through?

  3. What happens upon a restart of Logstash?
    This is more of a follow-up question to the second one. If he does know what indices he read, will that logic go out the window upon a restart?

  4. What happens when a document with the same ID already exists on the DEST cluster?
    If a document with the same ID, and the same version exists on the destination cluster, what happens then? In a remote reindex you can tell Elasticsearch how to behave in these scenarios, is it the same for Logstash?

Thanks in advance for any help!

Cheers,
Luka

Why not try a remote reindex?

However for both of these, if you stop the reindex and restart it, it will run from the beginning and redo all the indices. It doesn't track state.

Hi Mark,

Thanks for the answer. Unfortunately it is not possible to do a remote reindex for multiple indices and output them to the dest cluster with the same name. You can see the discussion here Remote reindex with wildcard to multiple indices - #7 by Christian_Dahlqvist

Cheers,
Luka

To frame the discussion and correctly identify any constraints on a possible solution it would be useful to have the following questions answered:

  1. What is the use case? What type of data are you ingesting?

  2. Are you using time-based indices, either through rollover or index naming convention?

  3. Is the data you index immutable or do you perform updates and/or deletes?

  4. How are you indexing data into the cluster? What does your ingest pipeline/process look like?

  5. Is this a one-off migration to a new cluster or do you intend to try and keep the clusters in sync?

Hi Christian,

  1. What is the use case? What type of data are you ingesting?

The use case is that we need to mirror the documents in the cluster 1-1. The mappings, and everything else is exactly the same. They are two separate clusters which server as backups for the other.

  1. Are you using time-based indices, either through rollover or index naming convention?

We are using time-based indices, and they are managed by ILM. However, the idea is that the indexes will be replicated under the same name with the same data, and we will manipulate ILM by setting "indexing_compete: true", and copy the creation date from each index to the origination_date on the destination so they're identical on both clusters. Afterwards we would set the write index for each rollover alias.

  1. Is the data you index immutable or do you perform updates and/or deletes?

The data is immutable.

  1. How are you indexing data into the cluster? What does your ingest pipeline/process look like?

We ingest the data with Kafka's Elasticsearch connector, Logstash would be a standalone VM which servers only for DR purposes.

Cheers,
Luka

Replicating data between clusters in a reliable, performant and efficient manner is not an easy problem to solve, especially when considering all the types of issues that need to be dealt with. This is exactly the use case Cross Cluster Replication (CCR) was designed for and as it solves an imprtant and difficult problem it is a commercial feature. Logstash was never designed for this and is IMHO not a viable solution as it does not maintain state and just transfer new data.

Before CCR was available the most common way to solve your usecase was (and still is if a commercial license is not an option) to index data in parallel into the two individual clusters, and you see in a good place to do this as you are using Kafka and just need to add another connector that independently writes to the other cluster. This does not necessarily come without some complications but is still worth considering. Note that you will need to make sure that index templates and configuration is synchronized as that is separate from the data.

Hi Christian,

Of course, Logstash is not used for that purpose here. Kafka is already used to ingest data into the two clusters. We are talking about a DR situation here in which one cluster is corrupted, the DC is no longer functional, and so on.

What we are looking for in this post is a complete DR situation, where recovery of the second Elasticsearch is not possible, and we need to replicate everything from the available cluster.

Cheers,
Luka

For that I would recommend the snapshot and restore functionality. If you want to reindex I would recommend you create your own scripts as this will give you greater control, but reindexing is a lot more expensive than restoring a snapshot. I do not think Logstash is at all suitable here.

A snapshot/restore is not possible for multiple reason, the biggest one being the massive size of the cluster. Remote reindexing was our original idea but that requires a sophisticated script which is a custom solution.

For what reason do you believe Logstash to not be suitable in this scenario?

How long do you think it will take to reindex all this data? This tend to use a lot more resources and be slower than restoring a snapshot.

How much data does the cluster hold?

How long time do you have to get the second cluster reindexed and ready?

How far apart are the clusters located? What does the latency and available throughput of the network between the clusters look like?

When it comes to reindexing as a solution, either through Logstash of custom script, you need to consider your cluster capacity. If we e.g. assume you need to reindex 7 days worth of data rom one cluster to another and you want this to complete within one day, your source cluster need to support reading all this data while still handling real-time indexing and the destination cluster need to be able to handle 7 times the normal indexing load (assuming it does not also serve real-time indexing off Kafka). Is your cluster sized with spare capacity to handle that load?

As far as I know the Logstash Elasticsearch plugin does not maintain any state. It will query all the data every time it runs. If there is any hiccup or the process takes too long (which will happen if you are trying to reindex a lot of data) I believe the data will be reprocessed as it restarts. If you have massive amounts of data to get through I suspect it may never finish.

To see what I mean, set up a test cluster and try to use this method to reindex a few TB of data from an existing cluster and see how it works.

1 Like

How long do you think it will take to reindex all this data? This tend to use a lot more resources and be slower than restoring a snapshot.

We do not know, this is something we can test by remote reindexing a specific index which is 1TB in size and see how long it takes.

How much data does the cluster hold?

At the moment, not a lot. But the actual capacity of the cluster once it's filled over time, is 743TB.

How long time do you have to get the second cluster reindexed and ready?

There is no time limit, it's something we are currently investigating in case of a need for DR. It's also not something which is time sensitive in case it does happen.

How far apart are the clusters located? What does the latency and available throughput of the network between the clusters look like?

The latency and network speed are of no concern - they are connected via dark fiber, and are not geographically too far apart.

If that is how Logstash functions with the Elasticsearch input plugin then it is definitely not a valid option. That is essentially what we wanted to know. Looks like our only option is to write a script which will use a reindex and go index by index.

Thanks a bunch for the help Christian!

Cheers,
Luka

I added a comment about cluster capacity as an edit. Make sure you take this into consideration as well.

1 Like

If you want to reindex multiple indices in parallel, make sure you test this as well.

1 Like

Hi Christian,

Following up here as the ground explanation is already laid out. Testing out the remote reindex, I see that slicing is not supported for remote reindexes, and the maximum batch size by default is 10k (determined by index.max_result_window).

Is there any way to parallelize a remote reindex by other means? Maybe some parameters we overlooked? So far, we haven't been able to get an indexing rate over 30k/s, and for that specific index type our normal ingest rate is 60k/s.

Our clusters tops out at an index rate of about 300k/s when catching up with Kafka.

Let me know if you want me to open a new thread for this question since it's not directly related to the subject.

Cheers,
Luka

Not that I am aware of. This is one of the reasons I would not reindex large amounts of data from remote clusters. Even if you managed to get the speed up, if something went wrong you would need to restart from scratch every time for the index you are reindexing.

I still do not understand why you can not use snapshot and restore, which is what I would use. Can you please elaborate?

I still do not understand why you can not use snapshot and restore

It's not that we can't. We'd just prefer it if there was a solution which didn't include a third NFS/S3 storage system because it is not under our direct control.

Hi @Christian_Dahlqvist,

We're having some issues maximizing our Logstash's throughput. This is the current configuration:

input {
    ElasticSearch {
        hosts => ["dc1myhost"]
        user => "**********"
        password => "*********"
        index => "index-1"
        size => 10000
        scroll => "1m"
        slices => 10
        docinfo => true
        ssl => true
        ca_file => "/etc/logstash/certs/ca.crt"
    }
}

output {
    ElasticSearch {
        hosts => ["dc2myhost"]
        user => "********"
        password => "********"
        index => "index-1-copy"
        aciton => create
        ssl => true
        cacert => "/etc/logstash/certs/ca.crt"
    }
} 

We've tried playing around with the pipeline.workers and pipeline.batch.size settings but nothing seems to change the index rate on the dest, which caps out at about 20k/s. Giving more memory to the JVM also doesn't change a thing.
The VM has 10 CPUs, 32GB mem, 22 of which is dedicated to the Logstash JVM. The num of CPUs and it's memory can be increased if needed, we have plenty resources.

What I've noticed is that increasing the number of workers above 10, to let's say 20 makes Logstash use about 7-8 CPUs compared to 3-4 with the default of 10, but on the dest the indexing rate of said index does not increase at all which I find bizzare.
There are absolutely no errors in the logstash logs either.

All of the threads and posts about optimizing Logstash output are related to general optimizations, but what we need is to maximize the usage of a single pipeline, since we'd be reading from one singular index to one singular index.

Any help would be appreciated.

Cheers,
Luka

As the reading from the index is not sliced by default I suspect this would hit the same bottleneck as the reindex from remote. You may try to specify the number of slices and increase throughput that way but I am not sure how much effect it will have.

Sorry about that, I pasted an old configuration, it's edited now.

The number of slices is 10, as is the number of primary shards for that index.

I have never used Logstash for a remote reindex and do not think it is a good idea. I will therefore not be able to help with this. Unless someone else chimes in you will need to test.