Migration from OpenSearch1.1 to Elasticsearch7.18 using logstash

Hi Experts,

I am trying to migrate my Opensearch cluster version 1.1 to elastic cloud 7.18. I have created a logstash pipeline for the same who's configuration looks like this :

input {
    opensearch {
        hosts => ["url"]
        user => "<user>"
        password => "<password>"
        index => "*"
        size => 100
        scroll => "1m"
        query =>  '{ "query": { "match_all": {}} }'
        docinfo => true
    }
}
filter {
}
output {
    elasticsearch {
        hosts => ["<elastic-cloud-url>"]
        user => "<username>"
        password => "<password>"
        index => "%{[@metadata][_index]}"
    }
        stdout { codec => rubydebug { metadata => true } }
}

The data for some indexes ranges ~50million and i am not sure how error handling should be done for big load if something goes wrong while migration, so i have following questions:

  1. Does logstash provides any error handling out of the box which will make sure the entire process of migration (let say data around ~60GB) completes seamless.

  2. If due to any network error migration fails, how logstash handles this,will the migration starts from the same record where it failed, how logstash avoids duplication in this scenario

  3. Can we do batch migration in logstash, if yes how ?

  4. If during batch migration any record of an batch encounters issue, does logstash supports any of rollback and retry to make sure everything get migrated successfully.

I am new to logstash so can someone suggest industry accepted way to configure pipeline to migrate clusters (~60GB) seamlessly from one version to another.

OpenSearch/OpenDistro are AWS run products and differ from the original Elasticsearch and Kibana products that Elastic builds and maintains. You may need to contact them directly for further assistance.

(This is an automated response from your friendly Elastic bot. Please report this post if you have any suggestions or concerns :elasticheart: )

What kind of Error? If Logstash receives any error from your destination Elasticsearch, depending on the error code it may retry or send the emssage to a Dead Letter Queue (DLQ) if you configured a DLQ, if not it will drop the messages.

So, if it is retryable error on the destination Logstash will retry, if it is a not retryable error it depends if you have DLQ or not, you can read more about DLQ here.

But if it is an error in the source, like the connection between logstash and the source was interrupted, you will need to start again.

Similar to the first question, if it is a network issue with the destination you can also use a persistent queue (PQ) to store the data in disk temporarily, if the destination is unreachable, logstash will store the events in the PQ before processing them until the PQ is full, after it is full logstash will stop accepting new events from the input. The documentation for PQ is here.

Logstash does not deal with duplicates, you need to do it yourself while storing the data, in this case where the output is Elasticsearch this can be done using the option document_id in the output configured to use a field in your document as the _id, probably [@metadata][_id], but you will need to check it out.

You would need to add those options in your Elasticsearch output:

        document_id => "%{field with the unique id}"
        action => "update"
        doc_as_upsert => true

Logstash makes request to Elasticsearch in bulk, the default bulk size is 125, but this is configured changing the pipeline.batch.size for your pipeline, this value is per worker and per default logstash uses one worker per cpu core, so if you have 4 cores, your final batch size is 125 * 4.

Similar to questions 1 and 2, if it is a retryable error it will be retryable, if not it will be sent to DLQ or drop if no DLQ is configured.

Both cases are present in logstash logs.

A follow up question, do we have a documentation of error codes which are retriable, and which are not?

Yes, the documentation covers retries at the bulk API level, as well as at the document level.

While migrating cluster from OSS 1.1 to Elasticsearch 7.17 using logstash-8.10.0 with config

input {
    opensearch {
        hosts => ["url"]
        user => "<user>"
        password => "<password>"
        index => "*,-.monitoring*,-.security*,-.kibana*"
        size => 100
        scroll => "1m"
        query =>  '{ "query": { "match_all": {}} }'
        docinfo => true
    }
}
filter {
}
output {
    elasticsearch {
        hosts => ["<elastic-cloud-url>"]
        user => "<username>"
        password => "<password>"
        index => "%{[@metadata][_index]}"
        document_type => "%{[@metadata][_type]}"
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false
    }
        stdout { codec => rubydebug { metadata => true } }
}

none of the data is migrated and i see a lot of occurrence of below warnings:

[2023-10-04T17:28:55,762][WARN ][logstash.outputs.elasticsearch][main][2359072d0f5e3878d59852077bc2b6f6faf10164ddcfac229a6212969f7aa047] Badly formatted index, after interpolation still contains placeholder: [%{[@metadata][_index]}]; event: `{"@timestamp"=>2023-10-04T17:28:55.383792559Z, "environments"=>["devrealm2"], "@metadata"=>{"input"=>{"opensearch"=>{"_id"=>"b1b3bd28-069a-422a-a86a-484fd4e7fd46", "_type"=>"_doc", "_index"=>"asset_import_history"}}}, "import_id"=>"b1b3bd28-069a-422a-a86a-484fd4e7fd46", "@version"=>"1", "import_time"=>1683534990189, "import_status"=>"passed",

strangely i dont see any error in logs, but just these warnings.

What could be the reason for these warnings and why data migration is failing ?

Some (or even all) of your events are missing the [@metadata][_index] field, so the string interpolation does not happen and the resulting index name is invalid.

How should i fix this , what will be the correct output configuration to migrate cluster from source to destination?

I see the [@metadata][_index] field from the imported source, example below :

{
        "@timestamp" => 2023-10-05T05:22:32.315431290Z,
      "environments" => [
        [0] "realm1"
    ],
         "@metadata" => {
        "input" => {
            "opensearch" => {
                   "_id" => "a15011ec-d8d5-478e-834a-c8d71645fe5a",
                 "_type" => "_doc",
                "_index" => "import_history"
            }
        }
    },
         "import_id" => "a15011ec-d8d5-478e-834a-c8d71645fe5a",
          "@version" => "1",
       "import_time" => 1668572982592,
     "import_status" => "passed",

No, you have a [@metadata][input][opensearch][_index] field.

Hi @Badger

Thanks for your response.
I see some more errors in the logs. The whole process just print this error and no data get migrated. Can you please suggest what could be wrong here and how to solve this.

[2023-10-05T19:11:50,743][WARN ][logstash.outputs.elasticsearch][main][19242e55d5969beeb4c14f4c82c3f10fb465ec2c5f9d239a2d1f2a16eb569256] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"logstash-2023.10.05", :routing=>nil}, {"run_id"=>"94662920-60c9-4ba3-be47-9999c54f4841", "start_time"=>1686055737551, "message"=>"Error in pre-processing. Error: Prev Cycle is running... Aborting", "edr_time_range"=>{"to"=>1686055437551, "from"=>1686054314000}, "account_id"=>"AllData", "time_taken"=>68, "status"=>"failed", "end_time"=>1686055737619, "@version"=>"1", "@timestamp"=>2023-10-05T19:11:50.255Z}], :response=>{"index"=>{"_index"=>"logstash-2023.10.05", "_type"=>"_doc", "_id"=>"GOBAAYsB3KwHh5mi38cd", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [status] of type [boolean] in document with id 'GOBAAYsB3KwHh5mi38cd'. Preview of field's value: 'failed'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Failed to parse value [failed] as only [true] or [false] are allowed."}}}}}
[2023-10-05T19:11:51,949][WARN ][logstash.outputs.elasticsearch][main][19242e55d5969beeb4c14f4c82c3f10fb465ec2c5f9d239a2d1f2a16eb569256] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"logstash-2023.10.05", :routing=>nil}, {"rule_name"=>"UHMSanityRule_devrealm2", "tenant_name"=>"devrealm2", "exec_date"=>"2022-03-10 08:00:03.5", "job_id"=>"d6580c7b-a542-4309-bb23-9679215f3ae3", "status"=>"Violation Count = 1", "@version"=>"1", "@timestamp"=>2023-10-05T19:11:50.864Z}], :response=>{"index"=>{"_index"=>"logstash-2023.10.05", "_type"=>"_doc", "_id"=>"nkpAAYsBz4-erieX4nWr", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [status] of type [boolean] in document with id 'nkpAAYsBz4-erieX4nWr'. Preview of field's value: 'Violation Count = 1'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Failed to parse value [Violation Count = 1] as only [true] or [false] are allowed."}}}}}
[2023-10-05T19:11:52,009][WARN ][logstash.outputs.elasticsearch][main][19242e55d5969beeb4c14f4c82c3f10fb465ec2c5f9d239a2d1f2a16eb569256] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"logstash-2023.10.05", :routing=>nil}, {"run_id"=>"b2169500-323b-42e7-97a4-24ee9d5ec349", "start_time"=>1686052800049, "message"=>"Error in pre-processing. Error: Prev Cycle is running... Aborting", "edr_time_range"=>{"to"=>1686052500049, "from"=>1685734825}, "account_id"=>"AllData", "time_taken"=>198, "status"=>"failed", "end_time"=>1686052800247, "@version"=>"1", "@timestamp"=>2023-10-05T19:11:50.797Z}], :response=>{"index"=>{"_index"=>"logstash-2023.10.05", "_type"=>"_doc", "_id"=>"j0pAAYsBz4-erieX4nWp", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [status] of type [boolean] in document with id 'j0pAAYsBz4-erieX4nWp'. Preview of field's value: 'failed'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Failed to parse value [failed] as only [true] or [false] are allowed."}}}}}
[2023-10-05T19:11:52,145][WARN ][logstash.outputs.elasticsearch][main][19242e55d5969beeb4c14f4c82c3f10fb465ec2c5f9d239a2d1f2a16eb569256] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"logstash-2023.10.05", :routing=>nil}, {"run_id"=>"86b9bf1a-9ca0-40d0-9559-b4bff1e867d5", "start_time"=>1686053700237, "message"=>"Completed Successfully", "account_id"=>"AllData", "time_taken"=>35, "operation"=>"Post-Processing", "end_time"=>1686053700272, "status"=>"success", "@version"=>"1", "@timestamp"=>2023-10-05T19:11:50.387Z}], :response=>{"index"=>{"_index"=>"logstash-2023.10.05", "_type"=>"_doc", "_id"=>"GeBAAYsB3KwHh5mi4se8", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [status] of type [boolean] in document with id 'GeBAAYsB3KwHh5mi4se8'. Preview of field's value: 'success'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Failed to parse value [success] as only [true] or [false] are allowed."}}}}}
[2023-10-05T19:11:52,146][WARN ][logstash.outputs.elasticsearch][main][19242e55d5969beeb4c14f4c82c3f10fb465ec2c5f9d239a2d1f2a16eb569256] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"logstash-2023.10.05", :routing=>nil}, {"rule_name"=>"mail fix test", "tenant_name"=>"devrealm2", "exec_date"=>"2022-03-17 04:00:08.6", "job_id"=>"fa839997-8271-484e-896b-9b1a1f9b9cd6", "status"=>"Violation Count = 1", "@version"=>"1", "@timestamp"=>2023-10-05T19:11:50.470Z}], :response=>{"index"=>{"_index"=>"logstash-2023.10.05", "_type"=>"_doc", "_id"=>"H-BAAYsB3KwHh5mi4se8", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [status] of type [boolean] in document with id 'H-BAAYsB3KwHh5mi4se8'. Preview of field's value: 'Violation Count = 1'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Failed to parse value [Violation Count = 1] as only [true] or [false] are allowed."}}}}}
[

You have a [status] field and the mapping that elasticsearch is using says that is a boolean, so it can only be true or false. You are trying to index a document where [status] has the value "failed", which is not a boolean value, so elasticsearch rejects it.

You may have set the index mapping with a template, or you may be using dynamic mapping. Are any documents in the index, or is it completely empty? If there are no documents at all then the issue is most likely in the template.

The error is being thrown for index "logstash-2023.10.05" by logstash output plugin which is created at runtime, so i am not sure where exactly the status field is defined and how i can change it.

If its a system index can this be ignored ?
Also what this error means: "Error in pre-processing. Error: Prev Cycle is running... Aborting"

Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :**_index=>"logstash-2023.10.05"**, :routing=>nil}, {"run_id"=>"94662920-60c9-4ba3-be47-9999c54f4841", "start_time"=>1686055737551, "message"=>"Error in pre-processing. Error: Prev Cycle is running... Aborting", "edr_time_range"=>{"to"=>1686055437551, "from"=>1686054314000}, "account_id"=>"AllData", "time_taken"=>68, "status"=>"failed", "end_time"=>1686055737619, "@version"=>"1", "@timestamp"=>2023-10-05T19:11:50.255Z}], :response=>{"index"=>{"_index"=>"logstash-2023.10.05", "_type"=>"_doc", "_id"=>"GOBAAYsB3KwHh5mi38cd", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [status] of type [boolean] in document with id 'GOBAAYsB3KwHh5mi38cd'. Preview of field's value: 'failed'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Failed to parse value [failed] as only [true] or [false] are allowed."}}}}}

I am not certain, but I think that is the [message] field of the event you are trying to index. So it is not a logstash question.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.