S3 input plugin OOM (heap space)

Hi,

I have a big issue with the S3 input plugin. I'm using it to restore files created using the s3 output plugin using gzip compression.

When triyng to restore files that are more the 100M it basically eat all JVM heap and crash. I've increased it up to 12G but still unable to process a 1Gb input file which my split strategy for output.

here are the logs of the logstash process that crash:

Sending Logstash logs to /usr/share/logstash/logs which is now configured via log4j2.properties
[2020-08-17T15:16:43,080][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.8.0", "jruby.version"=>"jruby 9.2.11.1 (2.5.7) 2020-03-25 b1f55b1a40 OpenJDK 64-Bit Server VM 11.0.7+10-LTS on 11.0.7+10-LTS +indy +jit [linux-x86_64]"}
[2020-08-17T15:16:44,905][INFO ][org.reflections.Reflections] Reflections took 28 ms to scan 1 urls, producing 21 keys and 41 values
[2020-08-17T15:16:52,450][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>10, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>10, "pipeline.sources"=>["/tmp/restore-s3.conf"], :thread=>"#<Thread:0xf2bf445 run>"}
[2020-08-17T15:16:52,954][INFO ][logstash.inputs.s3       ][main] Registering {:bucket=>"tagpay-logs", :region=>"fr-par"}
[2020-08-17T15:16:53,122][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2020-08-17T15:16:53,183][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2020-08-17T15:16:53,347][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2020-08-17T15:16:55,047][INFO ][logstash.inputs.s3       ][main][6f0985cd390ec160d264e507e9481323efd4aaef8ea3733bad946c3e4074a992] Using default generated file for the sincedb {:filename=>"/usr/share/logstash/data/plugins/inputs/s3/sincedb_89b7d960b5ee8f8d91196f28f27d1ad0"}
java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid7632.hprof ...
Heap dump file created [1605148657 bytes in 4.435 secs]
warning: thread "[main]<s3" terminated with exception (report_on_exception is true):
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(java/util/Arrays.java:3745)
        at java.lang.AbstractStringBuilder.ensureCapacityInternal(java/lang/AbstractStringBuilder.java:172)
        at java.lang.AbstractStringBuilder.append(java/lang/AbstractStringBuilder.java:686)
        at java.lang.StringBuilder.append(java/lang/StringBuilder.java:228)
        at java.io.BufferedReader.readLine(java/io/BufferedReader.java:372)
        at java.io.BufferedReader.readLine(java/io/BufferedReader.java:392)
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(jdk/internal/reflect/NativeMethodAccessorImpl.java:62)
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(jdk/internal/reflect/DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:566)
        at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:426)
        at org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:293)
        at java.lang.invoke.LambdaForm$DMH/0x0000000840669c40.invokeVirtual(java/lang/invoke/LambdaForm$DMH)
        at java.lang.invoke.LambdaForm$MH/0x00000008406b2440.invoke(java/lang/invoke/LambdaForm$MH)
        at java.lang.invoke.Invokers$Holder.linkToCallSite(java/lang/invoke/Invokers$Holder)
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_s3_minus_3_dot_5_dot_0.lib.logstash.inputs.s3.read_gzip_file(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-s3-3.5.0/lib/logstash/inputs/s3.rb:298)
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_s3_minus_3_dot_5_dot_0.lib.logstash.inputs.s3.RUBY$method$read_gzip_file$0$__VARARGS__(usr/share/logstash/vendor/bundle/jruby/$2_dot_5_dot_0/gems/logstash_minus_input_minus_s3_minus_3_dot_5_dot_0/lib/logstash/inputs//usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-s3-3.5.0/lib/logstash/inputs/s3.rb)
        at java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(java/lang/invoke/DirectMethodHandle$Holder)
        at java.lang.invoke.LambdaForm$MH/0x000000084066b840.invokeExact_MT(java/lang/invoke/LambdaForm$MH)
        at java.lang.invoke.LambdaForm$DMH/0x0000000840669c40.invokeVirtual(java/lang/invoke/LambdaForm$DMH)
        at java.lang.invoke.LambdaForm$MH/0x00000008406db840.invoke(java/lang/invoke/LambdaForm$MH)
        at java.lang.invoke.LambdaForm$MH/0x00000008406dbc40.linkToCallSite(java/lang/invoke/LambdaForm$MH)
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_s3_minus_3_dot_5_dot_0.lib.logstash.inputs.s3.read_file(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-s3-3.5.0/lib/logstash/inputs/s3.rb:277)
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_s3_minus_3_dot_5_dot_0.lib.logstash.inputs.s3.RUBY$method$read_file$0$__VARARGS__(usr/share/logstash/vendor/bundle/jruby/$2_dot_5_dot_0/gems/logstash_minus_input_minus_s3_minus_3_dot_5_dot_0/lib/logstash/inputs//usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-s3-3.5.0/lib/logstash/inputs/s3.rb)
        at java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(java/lang/invoke/DirectMethodHandle$Holder)
        at java.lang.invoke.LambdaForm$MH/0x000000084066b840.invokeExact_MT(java/lang/invoke/LambdaForm$MH)
warning: thread "[main]-pipeline-manager" terminated with exception (report_on_exception is true):
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(java/util/Arrays.java:3745)
        at java.lang.AbstractStringBuilder.ensureCapacityInternal(java/lang/AbstractStringBuilder.java:172)
        at java.lang.AbstractStringBuilder.append(java/lang/AbstractStringBuilder.java:686)
        at java.lang.StringBuilder.append(java/lang/StringBuilder.java:228)
        at java.io.BufferedReader.readLine(java/io/BufferedReader.java:372)
        at java.io.BufferedReader.readLine(java/io/BufferedReader.java:392)
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(jdk/internal/reflect/NativeMethodAccessorImpl.java:62)
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(jdk/internal/reflect/DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:566)
        at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:426)
        at org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:293)
        at java.lang.invoke.LambdaForm$DMH/0x0000000840669c40.invokeVirtual(java/lang/invoke/LambdaForm$DMH)
        at java.lang.invoke.LambdaForm$MH/0x00000008406b2440.invoke(java/lang/invoke/LambdaForm$MH)
        at java.lang.invoke.Invokers$Holder.linkToCallSite(java/lang/invoke/Invokers$Holder)
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_s3_minus_3_dot_5_dot_0.lib.logstash.inputs.s3.read_gzip_file(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-s3-3.5.0/lib/logstash/inputs/s3.rb:298)
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_s3_minus_3_dot_5_dot_0.lib.logstash.inputs.s3.RUBY$method$read_gzip_file$0$__VARARGS__(usr/share/logstash/vendor/bundle/jruby/$2_dot_5_dot_0/gems/logstash_minus_input_minus_s3_minus_3_dot_5_dot_0/lib/logstash/inputs//usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-s3-3.5.0/lib/logstash/inputs/s3.rb)
        at java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(java/lang/invoke/DirectMethodHandle$Holder)
        at java.lang.invoke.LambdaForm$MH/0x000000084066b840.invokeExact_MT(java/lang/invoke/LambdaForm$MH)
        at java.lang.invoke.LambdaForm$DMH/0x0000000840669c40.invokeVirtual(java/lang/invoke/LambdaForm$DMH)
        at java.lang.invoke.LambdaForm$MH/0x00000008406db840.invoke(java/lang/invoke/LambdaForm$MH)
        at java.lang.invoke.LambdaForm$MH/0x00000008406dbc40.linkToCallSite(java/lang/invoke/LambdaForm$MH)
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_s3_minus_3_dot_5_dot_0.lib.logstash.inputs.s3.read_file(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-s3-3.5.0/lib/logstash/inputs/s3.rb:277)
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_s3_minus_3_dot_5_dot_0.lib.logstash.inputs.s3.RUBY$method$read_file$0$__VARARGS__(usr/share/logstash/vendor/bundle/jruby/$2_dot_5_dot_0/gems/logstash_minus_input_minus_s3_minus_3_dot_5_dot_0/lib/logstash/inputs//usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-s3-3.5.0/lib/logstash/inputs/s3.rb)
        at java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(java/lang/invoke/DirectMethodHandle$Holder)
        at java.lang.invoke.LambdaForm$MH/0x000000084066b840.invokeExact_MT(java/lang/invoke/LambdaForm$MH)
        at java.lang.invoke.LambdaForm$DMH/0x0000000840669c40.invokeVirtual(java/lang/invoke/LambdaForm$DMH)
        at java.lang.invoke.LambdaForm$MH/0x00000008406db840.invoke(java/lang/invoke/LambdaForm$MH)
        at java.lang.invoke.LambdaForm$MH/0x00000008406dbc40.linkToCallSite(java/lang/invoke/LambdaForm$MH)
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_s3_minus_3_dot_5_dot_0.lib.logstash.inputs.s3.read_file(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-s3-3.5.0/lib/logstash/inputs/s3.rb:277)
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_s3_minus_3_dot_5_dot_0.lib.logstash.inputs.s3.RUBY$method$read_file$0$__VARARGS__(usr/share/logstash/vendor/bundle/jruby/$2_dot_5_dot_0/gems/logstash_minus_input_minus_s3_minus_3_dot_5_dot_0/lib/logstash/inputs//usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-s3-3.5.0/lib/logstash/inputs/s3.rb)
        at java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(java/lang/invoke/DirectMethodHandle$Holder)
        at java.lang.invoke.LambdaForm$MH/0x000000084066b840.invokeExact_MT(java/lang/invoke/LambdaForm$MH)
[2020-08-17T15:17:25,317][ERROR][org.logstash.Logstash    ] java.lang.OutOfMemoryError: Java heap space

So the issue seems to be in the Gzip uncompression. I've seen that it's using a BufferedReader without buffer size. So it maybe just eat everything until death.

Here is the s3 output configuration:

output {
  s3 {
    access_key_id => "{{ logstash_s3_access_key }}"
    secret_access_key => "{{ logstash_s3_secret }}"
    endpoint => "{{ logstash_s3_endpoint }}"
    region => "{{ logstash_s3_region }}"
    bucket => "{{ logstash_s3_bucket }}"
    canned_acl => "private"
    restore => "true"
    temporary_directory => "/data"
    prefix => "%{[@metadata][outputpath]}" 
    size_file => "268435456"
    time_file => "60"
    storage_class => "STANDARD"
    codec => "json"
    encoding => "gzip"
    additional_settings => {
      "force_path_style" => true
    }
    retry_count => "30"
    upload_workers_count => "1"
    upload_queue_size => "1"
  }
}

For the reader logstash here is it's configuration:

http.host: "0.0.0.0"
monitoring.enabled: false

pipeline.workers: "1"
pipeline.ordered: false
pipeline.batch.size: 10

queue.type: persisted
path.queue: "/tmp/queue"
queue.max_bytes: 10g

And the input configuration / pipeline:

input {
  s3 {
    access_key_id => "XXXXXX"
    secret_access_key => "XXXXX"
    endpoint => "..."
    region => "fr-par"
    bucket => "xxxx"


    prefix => "XXXXX/2020/08/17/"

    # we keep datas in json format
    codec => "json"
    additional_settings => {
      "force_path_style" => true
    }
    delete => "false"
    watch_for_new_files => "false"
  }
}

filter {

  # decrypt wrapped field field
  cipher {
    mode => "decrypt"
    source => "wrapped"
    target => "wrapped"
    algorithm => "aes-256-cbc"
    iv_random_length => 16
    key => "xxxxx"
    key_size => 32
    cipher_padding => 1
    base64 => true
  }

  # Move all fields of the document to wrapped
  ruby {
    code => '
      p = event.get("wrapped")
      j = JSON.parse(p)
      j.to_hash.each { |k,v|
        if ["@timestamp"].include?(k)
          event.set(k, LogStash::Timestamp.parse_iso8601(v))
        else
          event.set(k, v)
        end
      }
      event.remove("wrapped")
    '
  }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.