On-prem to Cloud Via Logstash

I am moving my logs from an on-prem instance, up to Elastic Cloud. I have setup a logstash pipeline, with a very large persistent queue, like below to push events up to the Cloud.

input {
  elasticsearch {
    hosts => "https://elasic.contoso.net:9200"
    index => "some_index-2022"
    query => '{ "query": { "match_all": {} }, "_source": ["message", "@timestamp"] }'
  }
}
filter {
...custom enrichment...
}
output {
  elasticsearch {
    cloud_id => "xx-xx:xxx"
    api_key => "xxxx"
    data_stream => true
    ssl => true
  }
}

Logstash pulls the events very quickly at first, but then slows down to a trickle. Here's what the elasticsearch input looks like, it ingests events for about 4-5 hours.

image

Any ideas why this happens or anything I can do shorten this....slope?

What is your pipeline configuration?

My expectation is that logstash is running into one of three results:

  1. input IO bottleneck
  2. output IO bottleneck
  3. (unlikely) backoff elasticsearch (429 response codes).

Logstash pipelines work based on 1*x + n threads; x is the amount of input{} configured in the pipeline and n is the amount of workers assigned to your pipeline (pipeline.yml). n is assigned the filter {} output {} portion of your pipeline.

IO is expensive and ususally slow, this is why splitting up inputs can benefit your processing speed as they will work in parralel. The workers then handle both inputs in the pipeline with the same filter and output.

Output again being IO (network traffic) this is slow, so when one call to cloud is happening and waiting for a response, another worker can do it again, creating a parrallel processing stream.

play around with multiple inputs (x) and the amount of workers assigned to the pipeline (n) to see if you can improve the throughput. Keeping in mind the CPU size assigned to the node handling the pipeline ofcourse.

Fantastic explanation! You started with the math and my brain went 'oh no...' and then you used words and it was all like 'awww yaaa'.

I have one additional input that grabs events sent from the Elastic Agent. My filter section consists of a couple basic mutate filters to put some information at the beginning of each message field and then I parse out the message field using a kv filter. From there I use a few filters to add my custom fields and then finally delete the parsed out fields. The parsed fields are stuck into an object so I just reference [temp] to delete the temp fields.

I'm running a single pipeline on a VM with 8 vCPUs assigned to it. Looks like under full load, Logstash tops out around 45% CPU usage.

  if [data_source] == "on-prem" {
    if [message] !~ /^<\d+?>date=.*/ {
      mutate {
        add_field => {
          "date" => "%{@timestamp}"
        }
      }
      mutate {
        gsub => [
          "date", '^(\d{4}-\d{2}-\d{2}).*', '\1'
        ]
      }
      mutate {
        gsub => [
          "message", '^', '<189>date=%{date} '
        ]
      }
      mutate {
        remove_field => [
          "date",
          "@timestamp"
        ]
      }
    }
  }
  if [data_stream][namespace] == "fortigate" {
    mutate {
      copy => {
        "message" => "temp_message"
      }
    }
    mutate {
          id => "Remove Numbered Date Fields"
          gsub => [
            "temp_message", "<.+>date=.+? ", ""
          ]
        }
    kv {
      source => "temp_message"
      target => "temp"
      exclude_keys => [
        "date",
        "time"
      ]
      include_brackets => false
      timeout_millis => 60000
    }
    if [temp][action] == "perf-stats" {
      csv {
        id => "Split Bandwidth Stats"
        columns => [ "[bandwidth_down]","[bandwidth_up]" ]
        separator => "/"
        source => "[temp][bandwidth]"
      }
    }
    if [temp][user] {
      mutate {
        lowercase => [ "[temp][user]" ]
        gsub => [
        "[temp][user]", "user1,|n/a|^\w+[\\]+|usercertauth,cn=|^cn ?= ?|dc.*?\"|[\\]+\"", ""
        ]
      }
      mutate {
        add_field => {
          "[source][user][full_name]" => "%{[temp][user]}"
        }
      }
      translate {
        id => "Certificate Users Name Conversion"
        target => "[source][user][full_name]"
        dictionary_path => ""
        source => "[source][user][full_name]"
        override => true
      }
      translate {
        id => "Department Lookup"
        target => "[source][user][department]"
        dictionary_path => ""
        source => "[source][user][full_name]"
        override => true
      }
      translate {
        target => "[source][user][full_name]"
        dictionary_path => ""
        source => "[source][user][full_name]"
        override => true
      }
      ruby {
        code => 'event.set("[source][user][full_name]", event.get("[source][user][full_name]").split.map(&:capitalize).join(" "))'
      }
      mutate {
        gsub => [
        "[source][user][full_name]", '^([\w -]*?), ([\w ]*?)$', '\2 \1'
        ]
      }
    }
    if [source][user][department] {
      ruby {
        code => 'event.set("[source][user][department]", event.get("[source][user][department]").split.map(&:capitalize).join(" "))'
      }
    }
    if [source][user][full_name] == '%{[temp][user]}' {
      mutate {
        remove_field => [ "[source][user][full_name]" ]
      }
    }
    mutate {
      remove_field => [
        "[temp]",
        "[temp_message]"
      ]
    }
  }

Looking at the pipeline metrics, it appears output is my largest impact. I'll bump my worker county up to 10 and report back.

1 Like

I have written a blog that goes into detail about "slicing", you might find it useful: OpenSearch migration: Migrating 1 billion log lines from OpenSearch to Elasticsearch for improved performance and efficiency | Elastic Blog

OpenSearch/OpenDistro are AWS run products and differ from the original Elasticsearch and Kibana products that Elastic builds and maintains. You may need to contact them directly for further assistance.

(This is an automated response from your friendly Elastic bot. Please report this post if you have any suggestions or concerns :elasticheart: )

Thanks for that, I read through it and, while it does explain why this tailing off occurs, it doesn't look like I can do much. While I have many indices to pull data from, I am doing a single index at a time and each index only has a single shard. That would seem to rule out configuring slicing as the documentation states configuring more slices than shards can be detrimental. It seems like my only option may be to change the page size.

The default page size is 1000. I wonder, if the tailing off of performance is because of page depth, if I set a page size of 2000, would that cause the performance tail off to take twice as long, since it's taking twice as long to get to a given page size or am I misunderstanding something?

If you have are processing a single shard at a time, you can still "slice" using filter queries and several pipelines / logstash instances.

Ya, I guess I could setup additional pipelines and have them tackle indices in parallel. Thanks for the idea!

What is the your configuration of your Elastic Cloud? Instance Types and Sizes?

...limited...

1 zone, 15GB RAM 7.9 vCPU (ARM)
1 zone, 46TB of frozen storage

The hot tier is good for about 8k eps though, which is almost twice our current production ingest rate.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.