Logstash number of threads keeps increasing until crash

Hi,
I'm testing the following pipeline (stack version 8.4.1):

input { google_cloud_storage {...} } 
filter {csv {...} mutate {...} }
output { elasticsearch {...} }

with a large number of events (3M+).
After a few minutes, the number of threads increases until more than 18700 and logstash crashes with:

[689.902s][warning][os,thread] Failed to start thread "Unknown thread" - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
[689.902s][warning][os,thread] Failed to start the native thread for java.lang.Thread "Ruby-0-Fiber-18666"
[2022-09-01T11:52:51,964][FATAL][org.logstash.Logstash    ][org_dataset] uncaught error (in thread [org_dataset]>worker0)
java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached

I've tried all recipes found related to the subject but nothing changes (not even the number of threads that triggers the crash).

Has anyone a hint about the issue?

Looking at the thread dump, I can see the following thread to be the one multiplying itself (in waiting state):

Fiber thread for block at: uri:classloader:/jruby/kernel/enumerator.rb:62

stackTrace:
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@17.0.4/Native Method)
- parking to wait for <0x000000071726d660> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(java.base@17.0.4/LockSupport.java:341)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(java.base@17.0.4/AbstractQueuedSynchronizer.java:506)
at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17.0.4/ForkJoinPool.java:3463)
at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17.0.4/ForkJoinPool.java:3434)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@17.0.4/AbstractQueuedSynchronizer.java:1623)
at java.util.concurrent.ArrayBlockingQueue.take(java.base@17.0.4/ArrayBlockingQueue.java:420)
at org.jruby.ext.fiber.FiberQueue$1.run(FiberQueue.java:55)
at org.jruby.ext.fiber.FiberQueue$1.run(FiberQueue.java:52)
at org.jruby.RubyThread.executeTask(RubyThread.java:1712)
at org.jruby.RubyThread.executeTaskBlocking(RubyThread.java:1686)
at org.jruby.ext.fiber.FiberQueue.pop(FiberQueue.java:91)
at org.jruby.ext.fiber.ThreadFiber.exchangeWithFiber(ThreadFiber.java:109)
at org.jruby.ext.fiber.ThreadFiber.yield(ThreadFiber.java:222)
at org.jruby.ext.fiber.ThreadFiber.yield(ThreadFiber.java:229)
at org.jruby.ext.fiber.ThreadFiber$INVOKER$s$yield.call(ThreadFiber$INVOKER$s$yield.gen)
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:75)
at uri_3a_classloader_3a_.jruby.kernel.enumerator.invokeOther0:yield(uri:classloader:/jruby/kernel/enumerator.rb:65)
at uri_3a_classloader_3a_.jruby.kernel.enumerator.RUBY$block$initialize$2(uri:classloader:/jruby/kernel/enumerator.rb:65)
at java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(java.base@17.0.4/DirectMethodHandle$Holder)
at java.lang.invoke.LambdaForm$MH/0x00000008012eec00.invoke(java.base@17.0.4/LambdaForm$MH)
at java.lang.invoke.DelegatingMethodHandle$Holder.delegate(java.base@17.0.4/DelegatingMethodHandle$Holder)
at java.lang.invoke.LambdaForm$MH/0x00000008012ec400.guard(java.base@17.0.4/LambdaForm$MH)
at java.lang.invoke.Invokers$Holder.linkToCallSite(java.base@17.0.4/Invokers$Holder)
at usr.share.logstash.vendor.jruby.lib.ruby.stdlib.csv.parser.RUBY$method$emit_row$0(/usr/share/logstash/vendor/jruby/lib/ruby/stdlib/csv/parser.rb:1122)
at java.lang.invoke.LambdaForm$DMH/0x0000000801ec8c00.invokeStatic(java.base@17.0.4/LambdaForm$DMH)
at java.lang.invoke.LambdaForm$MH/0x000000080128c400.invokeExact_MT(java.base@17.0.4/LambdaForm$MH)
at org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:165)
at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:185)
at org.jruby.runtime.callsite.RefinedCachingCallSite.call(RefinedCachingCallSite.java:95)
at usr.share.logstash.vendor.jruby.lib.ruby.stdlib.csv.parser.invokeOther49:emit_row(/usr/share/logstash/vendor/jruby/lib/ruby/stdlib/csv/parser.rb:871)
at usr.share.logstash.vendor.jruby.lib.ruby.stdlib.csv.parser.RUBY$block$parse_quotable_loose$1(/usr/share/logstash/vendor/jruby/lib/ruby/stdlib/csv/parser.rb:871)
at java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(java.base@17.0.4/DirectMethodHandle$Holder)
at java.lang.invoke.LambdaForm$MH/0x00000008012eec00.invoke(java.base@17.0.4/LambdaForm$MH)
at java.lang.invoke.DelegatingMethodHandle$Holder.delegate(java.base@17.0.4/DelegatingMethodHandle$Holder)
at java.lang.invoke.LambdaForm$MH/0x00000008012ec400.guard(java.base@17.0.4/LambdaForm$MH)
at java.lang.invoke.Invokers$Holder.linkToCallSite(java.base@17.0.4/Invokers$Holder)
at usr.share.logstash.vendor.jruby.lib.ruby.stdlib.csv.parser.RUBY$block$each_line$1(/usr/share/logstash/vendor/jruby/lib/ruby/stdlib/csv/parser.rb:52)
at java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(java.base@17.0.4/DirectMethodHandle$Holder)
at java.lang.invoke.LambdaForm$MH/0x0000000801ec8400.invoke(java.base@17.0.4/LambdaForm$MH)
at java.lang.invoke.LambdaForm$MH/0x0000000800cad000.invokeExact_MT(java.base@17.0.4/LambdaForm$MH)
at org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBody.java:151)
at org.jruby.runtime.IRBlockBody.yieldSpecific(IRBlockBody.java:89)
at org.jruby.runtime.Block.yieldSpecific(Block.java:166)
at org.jruby.util.StringSupport.rbStrEnumerateLines(StringSupport.java:2057)
at org.jruby.util.StringSupport.rbStrEnumerateLines(StringSupport.java:1958)
at org.jruby.RubyString.each_line(RubyString.java:5944)
at org.jruby.RubyString$INVOKER$i$each_line.call(RubyString$INVOKER$i$each_line.gen)
at org.jruby.runtime.callsite.RefinedCachingCallSite.call(RefinedCachingCallSite.java:95)
at org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:192)
at usr.share.logstash.vendor.jruby.lib.ruby.stdlib.csv.parser.invokeOther5:each_line(/usr/share/logstash/vendor/jruby/lib/ruby/stdlib/csv/parser.rb:49)
at usr.share.logstash.vendor.jruby.lib.ruby.stdlib.csv.parser.RUBY$method$each_line$0(/usr/share/logstash/vendor/jruby/lib/ruby/stdlib/csv/parser.rb:49)
at java.lang.invoke.LambdaForm$DMH/0x0000000801ec8000.invokeStatic(java.base@17.0.4/LambdaForm$DMH)
at java.lang.invoke.LambdaForm$MH/0x000000080128c400.invokeExact_MT(java.base@17.0.4/LambdaForm$MH)
at org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:165)
at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:185)
at org.jruby.runtime.callsite.RefinedCachingCallSite.call(RefinedCachingCallSite.java:95)
at org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:192)
at usr.share.logstash.vendor.jruby.lib.ruby.stdlib.csv.parser.invokeOther54:each_line(/usr/share/logstash/vendor/jruby/lib/ruby/stdlib/csv/parser.rb:823)
at usr.share.logstash.vendor.jruby.lib.ruby.stdlib.csv.parser.RUBY$method$parse_quotable_loose$0(/usr/share/logstash/vendor/jruby/lib/ruby/stdlib/csv/parser.rb:823)
at java.lang.invoke.LambdaForm$DMH/0x0000000801ec7c00.invokeStatic(java.base@17.0.4/LambdaForm$DMH)
at java.lang.invoke.Invokers$Holder.invokeExact_MT(java.base@17.0.4/Invokers$Holder)
at org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:152)
at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:148)
at org.jruby.runtime.callsite.RefinedCachingCallSite.call(RefinedCachingCallSite.java:71)
at usr.share.logstash.vendor.jruby.lib.ruby.stdlib.csv.parser.invokeOther26:parse_quotable_loose(/usr/share/logstash/vendor/jruby/lib/ruby/stdlib/csv/parser.rb:336)
at usr.share.logstash.vendor.jruby.lib.ruby.stdlib.csv.parser.RUBY$method$parse$0(/usr/share/logstash/vendor/jruby/lib/ruby/stdlib/csv/parser.rb:336)
at usr.share.logstash.vendor.jruby.lib.ruby.stdlib.csv.parser.RUBY$method$parse$0$__VARARGS__(/usr/share/logstash/vendor/jruby/lib/ruby/stdlib/csv/parser.rb:318)
at java.lang.invoke.LambdaForm$DMH/0x0000000801ebd400.invokeStatic(java.base@17.0.4/LambdaForm$DMH)
at java.lang.invoke.LambdaForm$MH/0x000000080128c400.invokeExact_MT(java.base@17.0.4/LambdaForm$MH)
at org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:139)
at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:112)
at org.jruby.RubyClass.finvokeWithRefinements(RubyClass.java:512)
at org.jruby.RubyBasicObject.send(RubyBasicObject.java:1733)
at org.jruby.RubyBasicObject$INVOKER$i$send.call(RubyBasicObject$INVOKER$i$send.gen)
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:85)
at org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:94)
at uri_3a_classloader_3a_.jruby.kernel.enumerator.invokeOther3:__send__(uri:classloader:/jruby/kernel/enumerator.rb:64)
at uri_3a_classloader_3a_.jruby.kernel.enumerator.RUBY$block$initialize$1(uri:classloader:/jruby/kernel/enumerator.rb:64)
at java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(java.base@17.0.4/DirectMethodHandle$Holder)
at java.lang.invoke.LambdaForm$MH/0x0000000801ec5800.invoke(java.base@17.0.4/LambdaForm$MH)
at java.lang.invoke.LambdaForm$MH/0x0000000800cad000.invokeExact_MT(java.base@17.0.4/LambdaForm$MH)
at org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBody.java:151)
at org.jruby.runtime.IRBlockBody.yieldSpecific(IRBlockBody.java:72)
at org.jruby.runtime.Block.yieldSpecific(Block.java:157)
at org.jruby.ext.fiber.ThreadFiber.lambda$createThread$0(ThreadFiber.java:302)
at org.jruby.ext.fiber.ThreadFiber$$Lambda$1482/0x0000000801e556e0.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.4/ThreadPoolExecutor.java:1136)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.4/ThreadPoolExecutor.java:635)
at java.lang.Thread.run(java.base@17.0.4/Thread.java:833)
Locked ownable synchronizers:
- <0x0000000713609ab8> (a java.util.concurrent.ThreadPoolExecutor$Worker)

Do you have a lot of files in the buckets of your google_cloud_storage input?

I've had some issues with this plugin in the past if the bucket has a lot of files because it needs to list everything every time it reads from the bucket.

This plugin is also has not been updated since 2019, it was developed by someone at Google, but it seens abandoned now.

There's only one file but it is 3,4Gb and 30M lines. Logstash seems to download it fine, that's when it starts reading events from its local copy of the file that it crashes with the above exception.

What is the size of the Java Heap for Logstash? Have you tried to increase it?

Also, have you tried to download the file manually and use the file input plugin to read from it? I would suggest that you try that to see if the issue is with the input plugin or in the core of Logstash.

I'm not sure how the gloogle_cloud_storage plugin works, if it reads the temporary file or if it loads it into memory.

I've played around with the heap and stack size with no change in behavior (not even in the threshold of threads that it reaches before the crash).
I did like you said: testing with a file input instead. Same behavior.

So I run the same pipeline (with file input) in Logstash 7.17, and it works like a charm: the number of threads stays around 30. While running on 8.4, it keeps increasing until 18600+ and crashes. And then testing with 8.3.0: it works too... (number of threads stay around 30)
That narrows down an issue in 8.4, isn't it?

@Alain_Bod @leandrojmp

I just experienced something very similar with 8.4. 0 / 8.4.1 pipeline. I ran many times from 7.x to 8.3.3 is suddenly crashing in 8.4.0 / 8.4.1 complaining about resources.

This is a single CSV File with about 2M records.

Interesting mine is with CSV Filter as well... hmm

I put a check in with engineering.

@Alain_Bod I would probably revert back to your 7.17 or try 8.3.3

I see the exact same error message on my MAC

[2022-09-01T11:52:51,964][FATAL][org.logstash.Logstash    ][org_dataset] uncaught error (in thread [org_dataset]>worker0)
java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached

Then I ran 8.3.3. it loads about 2 million records. It's fine.

This looks like a regression to me.

2 Likes

@Alain_Bod

Yup this is a regression Identified by Engineering, I don't have a github issue yet... so you will need to downgrade to 8.3.3 or less.

BUT it did get me thinking I tested dissect filter it works.. and it is much faster for my 2M rows about twice as fast.... so you could convert to that if you desired. I have been told in general dissect is more efficient than csv

1 Like

I'm suffering from the same issue with the same config: Input is a CSV file, filter is the CSV one and output goes to Elasticsearch (and stdout {}). The error occurs since 8.4.X, the previous versions worked like a charm. However, the file is much smaller (only about 2000 lines) and it crashed with fewer threads:

[20.486s][warning][os,thread] Failed to start thread "Unknown thread" - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
[20.486s][warning][os,thread] Failed to start the native thread for java.lang.Thread "Ruby-0-Fiber-991"
{"level":"FATAL","loggerName":"org.logstash.Logstash","timeMillis":1662621252494,"thread":"[postalcode-csv]>worker0","logEvent":{"message":"uncaught error (in thread [postalcode-csv]>worker0)"}}

I've already reported it to the support team, but wanted to add to the discussion anyway.

1 Like

@seb.sch @Alain_Bod @leandrojmp

Here is the issue, it will be backported to 8.4

From engineering

"Unfortunately, this was an issue with the version of jruby and the csv gem it ships with, and will require an upgrade to logstash itself to fix, and not just a plugin upgrade"

I will re-add... dissect rocks :slight_smile:

dissect rocks when you are sure of your input format, right?
When in CSV, the quote character may or may not present depending on the field, how do you handle it with dissect?

Yeah dissect requires a little more "understanding" of data.

You would need to know where the quotes are ahead of time and add them to the dissect patterns, so it does not fit every use case.

Something like (note the single quotes)

filter {
dissect {
      mapping => {
        "message" => '%{name},"%{addr1}",%{addr2},%{addr3},%{city},%{zip}'
      }
    }
}

Hi do you have a process to make it work on debian 10 with 8.4 version? Or should I downgrade in waiting (but waiting what;) )

Thanks

Hi @Henri_L

No easy workaround other than Use dissect.

Looks like fix is merged and marked for 8.4.2 which should be soon.

thanks I will wait for 8.4.2 and in the meantime I'll downgrade to 8.3.3 :wink: