Output Logstash plugin for Azure Data Explorer (ADX) quickly running out of capacity

Dear All,
I use the ADX output plugin to send FW logs to ADX.
I first tried it with a stdout{} plugin to massage the data and parse it my way and it was working fine.
Now when I enabled the kusto output (That is how its called ADX) and started it manually from the command line ./logstash -f my.conf its also working fine but because the log stream is huge (almost 120 FW appliances feeding in data) at some point after a couple of minutes the Ingestor queue capacity starts running low to 0 slots and the ingestion stops, please see the code snipped below

[INFO ] 2022-07-26 07:10:44.077 [[main]>worker5] kusto - Opening file {:path=>"/tmp/kusto/2022-07-26-07-10-43.txt.testXXXMYTableXXX.testXXXMYTableXXX"}
[INFO ] 2022-07-26 07:10:44.078 [[main]>worker5] kusto - Opening file {:path=>"/tmp/kusto/2022-07-26-11-10-43.txt.testXXXMYTableXXX.testXXXMYTableXXX"}
[INFO ] 2022-07-26 07:10:44.107 [[main]>worker4] kusto - Opening file {:path=>"/tmp/kusto/2022-07-26-05-10-44.txt.testXXXMYTableXXX.testXXXMYTableXXX"}
[INFO ] 2022-07-26 07:10:44.340 [[main]>worker7] kusto - Opening file {:path=>"/tmp/kusto/2022-07-26-08-40-44.txt.testXXXMYTableXXX.testXXXMYTableXXX"}
[INFO ] 2022-07-26 07:10:44.345 [[main]>worker1] kusto - Opening file {:path=>"/tmp/kusto/2022-07-26-04-10-43.txt.testXXXMYTableXXX.testXXXMYTableXXX"}
[INFO ] 2022-07-26 07:10:44.538 [[main]>worker0] kusto - Opening file {:path=>"/tmp/kusto/2022-07-26-12-10-44.txt.testXXXMYTableXXX.testXXXMYTableXXX"}
[INFO ] 2022-07-26 07:10:44.539 [[main]>worker0] kusto - Opening file {:path=>"/tmp/kusto/2022-07-26-11-10-44.txt.testXXXMYTableXXX.testXXXMYTableXXX"}
[INFO ] 2022-07-26 07:10:44.721 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-04-41-11.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.746 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-04-41-12.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.747 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-11-41-12.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.747 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-08-11-12.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.748 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-12-41-12.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.748 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-04-41-13.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.749 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-11-41-11.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.749 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-10-41-12.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.749 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-03-41-12.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.750 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-03-41-13.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.750 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-08-11-13.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.750 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-11-41-13.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.750 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-10-41-13.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.751 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-12-41-13.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.751 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-04-41-14.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.753 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-08-11-14.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.755 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-12-41-14.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.756 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-10-41-14.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.756 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-11-41-14.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.756 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-03-41-14.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.756 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-04-41-15.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.757 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-08-11-15.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.757 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-06-41-14.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.758 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-11-41-15.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.758 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-04-41-16.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.758 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-04-41-17.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.758 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-11-41-16.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.758 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-03-41-16.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.759 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-12-41-16.txt.testXXXMYTableXXX.testXXXMYTableXXX
[WARN ] 2022-07-26 07:10:44.759 [[main]>worker3] kusto - Ingestor queue capacity is running low with 3 free slots.
[INFO ] 2022-07-26 07:10:44.760 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-12-41-17.txt.testXXXMYTableXXX.testXXXMYTableXXX
[WARN ] 2022-07-26 07:10:44.760 [[main]>worker3] kusto - Ingestor queue capacity is running low with 2 free slots.
[INFO ] 2022-07-26 07:10:44.760 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-10-41-17.txt.testXXXMYTableXXX.testXXXMYTableXXX
[WARN ] 2022-07-26 07:10:44.760 [[main]>worker3] kusto - Ingestor queue capacity is running low with 1 free slots.
[INFO ] 2022-07-26 07:10:44.761 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-08-11-16.txt.testXXXMYTableXXX.testXXXMYTableXXX
[WARN ] 2022-07-26 07:10:44.761 [[main]>worker3] kusto - Ingestor queue capacity is running low with 0 free slots.
[INFO ] 2022-07-26 07:10:44.761 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-08-11-17.txt.testXXXMYTableXXX.testXXXMYTableXXX
[WARN ] 2022-07-26 07:10:44.762 [[main]>worker3] kusto - Ingestor queue capacity is running low with 0 free slots.
[INFO ] 2022-07-26 07:10:44.762 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-10-41-16.txt.testXXXMYTableXXX.testXXXMYTableXXX
[WARN ] 2022-07-26 07:10:44.762 [[main]>worker3] kusto - Ingestor queue capacity is running low with 0 free slots.
[ERROR] 2022-07-26 07:12:41.928 [Kusto to ingest file: /tmp/kusto/2022-07-26-04-41-11.txt.testXXXMYTableXXX.testXXXMYTableXXX] kusto - Uploading failed, retrying. {:exception=>Java::ComMicrosoftAzureKustoIngestExceptions::IngestionClientException, :message=>"Failed to ingest from file", :path=>"/tmp/kusto/2022-07-26-04-41-11.txt.testXXXMYTableXXX.testXXXMYTableXXX", :backtrace=>["com.microsoft.azure.kusto.ingest.QueuedIngestClient.ingestFromFile(com/microsoft/azure/kusto/ingest/QueuedIngestClient.java:191)", "jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)", "jdk.internal.reflect.NativeMethodAccessorImpl.invoke(jdk/internal/reflect/NativeMethodAccessorImpl.java:62)", "jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(jdk/internal/reflect/DelegatingMethodAccessorImpl.java:43)", "java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:566)", "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:426)", "org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:293)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_output_minus_kusto_minus_1_dot_0_dot_4_minus_java.lib.logstash.outputs.kusto.ingestor.upload(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-kusto-1.0.4-java/lib/logstash/outputs/kusto/ingestor.rb:99)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_output_minus_kusto_minus_1_dot_0_dot_4_minus_java.lib.logstash.outputs.kusto.ingestor.upload_async(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-kusto-1.0.4-java/lib/logstash/outputs/kusto/ingestor.rb:73)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:275)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.concurrent_minus_ruby_minus_1_dot_1_dot_9.lib.concurrent_minus_ruby.concurrent.executor.java_executor_service.run(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_executor_service.rb:79)", "java.util.concurrent.Executors$RunnableAdapter.call(java/util/concurrent/Executors.java:515)", "java.util.concurrent.FutureTask.run(java/util/concurrent/FutureTask.java:264)", "java.util.concurrent.ThreadPoolExecutor.runWorker(java/util/concurrent/ThreadPoolExecutor.java:1128)", "java.util.concurrent.ThreadPoolExecutor$Worker.run(java/util/concurrent/ThreadPoolExecutor.java:628)", "java.lang.Thread.run(java/lang/Thread.java:829)"]}

My Unix based server is pretty beefed up so is there a way for me to increase this queue capacity slots or somehow make logstash conf lighter such as batching or something similar? Or in general dos anybody knows how I can overcome this situation?

Below is the sanitized version of my config file

input
{
#stdin{}
        syslog {
                host => "127.0.0.1"
                port => 12345
        }
}

filter
{
        if [program] =~ "box_Firewall_Activity" and [message] =~ "proto=" {
                grok {
                        keep_empty_captures => true
                        match => { "message" => "(?<Timezone>\+[\d]{2}:[\d]{2})\s+%{WORD:LogLevel}\s+(?<OriginSource>[a-zA-Z0-9]+)\s+%{WORD:Action}:\s+%{GREEDYDATA:msg}" }
                }
                dissect { mapping => { "msg" => "type=%{type}|proto=%{proto}|srcIF=%{srcIF}|srcIP=%{srcIP}|srcPort=%{srcPort}|srcMAC=%{srcMAC}|dstIP=%{stIP}|dstPort=%{dstPort}|dstService=%{dstService}|dstIF=%{dstIF}|rule=%{rule}|info=%{info}|srcNAT=%{srcNAT}|dstNAT=%{dstNAT}|duration=%{duration}|count=%{count}|receivedBytes=%{receivedBytes}|sentBytes=%{sentBytes}|receivedPackets=%{receivedPackets}|sentPackets=%{sentPackets}|user=%{user}|protocol=%{protocol}|application=%{application}|target=%{target}|content=%{content}|urlcat=%{urlcat}" } }
                mutate { add_tag => "commonFW" add_tag => "PH2"  add_tag => "kv" add_tag => "myserver" add_tag => "box_Firewall_Activity_kv" add_field => { "EventTime" => "%{@timestamp}" } }
                prune { blacklist_names => [ "message", "msg" ] }
                }
        else if [program] =~ "box_Firewall_Activity" and [message] !~ "proto=" {
                grok {
                        keep_empty_captures => true
                        match => { "message" => "(?<Timezone>\+[\d]{2}:[\d]{2})\s+%{WORD:LogLevel}\s+(?<OriginSource>[a-zA-Z0-9]+)\s+%{WORD:Action}:\s+%{GREEDYDATA:msg}" }
                }
               csv { source => "msg" separator => "|" columns => ["type", "proto", "srcIF", "srcIP", "srcPort", "srcMAC", "dstIP", "dstPort", "dstService", "dstIF", "rule", "info", "srcNAT", "dstNAT", "duration", "count", "receivedBytes", "sentBytes", "receivedPackets", "sentPackets", "user", "protocol", "application", "target", "content", "urlcat"] }
                mutate { add_tag => "commonFW" add_tag => "PH2"  add_tag => "csvPipe" add_tag => "myserver" add_tag => "box_Firewall_Activity_v" add_field => { "EventTime" => "%{@timestamp}" } }
                prune { blacklist_names => [ "message", "msg" ] }
        }
        else if [program] =~ "box_Firewall_threat" {
                dissect { mapping => { "message" => "%{TZ} %{LogLevel}  %{LogCollector} %{Service}: [%{TrafficType}] %{Action}:   %{Module} %{Rule} |[%{Signature}]|%{}|%{SignatureVersion}|%{Technique}" } }
                mutate { add_tag => "ips" add_tag => "PH2"  add_tag => "myserver" add_tag => "box_Firewall_threat" add_field => { "EventTime" => "%{@timestamp}" } }
                prune { blacklist_names => [ "message", "msg" ] }
        } else { drop{} }
        cidr {
                add_tag => [ "InternalSIP" ]
                address => [ "%{srcIP}" ]
                network => [ "10.0.0.0/8", "The rest of my networks" ]
       }
        if [srcIP] =~ ":" { mutate { add_tag => "IPv6" add_tag => "InternalSIP" } }
        if "InternalSIP" not in [tags] {
        geoip { source => 'srcIP' default_database_type => "ASN" }
        if [geoip][as_org] =~ "test1" { mutate { add_tag => "InternalSIP" add_tag => "test1 VM" } }
        else if  [geoip][as_org] =~ "test2" { mutate { add_tag => "InternalSIP" add_tag => "test2 VM" } }
        }
if "InternalSIP" not in [tags] and "commonFW" in [tags] and ([Action] =~ "Block" or [Action] =~ "Denied" or [Action] =~ "Remove") { drop{} }
}
output
{
#       if "IPv6" in [tags] { exec { command => 'echo "allowed IPv6 %{srcIP}" ' } }
#       else if "InternalSIP" not in [tags] and "commonFW" in [tags] and ([Action] =~ "Block" or [Action] =~ "Denied" or [Action] =~ "Remove") { exec { command => 'echo "blocked ingress fw %{srcIP} %{Action}" ' } }
#       else if "InternalSIP" not in [tags] and "commonFW" in [tags] and ([Action] !~ "Block" or [Action] !~ "Denied" or [Action] !~ "Remove") { exec { command => 'echo "allowed ingress fw %{srcIP} %{Action}" ' } }
#       else if "InternalSIP" in [tags] and "commonFW" in [tags] { exec { command => 'echo "allowed internal or egress fw %{srcIP} %{tags}" ' } }
#       else if "ips" in [tags] { exec { command => 'echo "allowed ips %{Signature} OOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO" ' } }
        if "commonFW" in [tags] {
                kusto {
                        path => "/tmp/kusto/%{+YYYY-MM-dd-HH-mm-ss}.txt"
                        ingest_url => "URL"
                        app_id => "id"
                        app_key => "key"
                        app_tenant => "tenant"
                        database => "DB"
                        table => "Table1"
                        json_mapping => "DBM1"
                }
        } else if "ips" in [tags] {
                kusto {
                        path => "/tmp/kusto/%{+YYYY-MM-dd-HH-mm-ss}.txt"
                        ingest_url => "URL"
                        app_id => "id"
                        app_key => "key"
                        app_tenant => "tenant"
                        database => "DB"
                        table => "Table2"
                        json_mapping => "DBM2"
                }
        }
}

Also forgot to mention that eventually the below error occurs

[DEBUG] 2022-07-28 11:40:19.864 [pool-4-thread-1] jvm - collector name {:name=>"ParNew"}
[DEBUG] 2022-07-28 11:40:19.864 [pool-4-thread-1] jvm - collector name {:name=>"ConcurrentMarkSweep"}
[DEBUG] 2022-07-28 11:40:20.071 [Ruby-0-Thread-9: :1] kusto - Starting flush cycle
[DEBUG] 2022-07-28 11:40:20.456 [logstash-pipeline-flush] PeriodicFlush - Pushing flush onto pipeline.
[DEBUG] 2022-07-28 11:40:22.071 [Ruby-0-Thread-9: :1] kusto - Starting flush cycle
[DEBUG] 2022-07-28 11:40:24.072 [Ruby-0-Thread-9: :1] kusto - Starting flush cycle
[DEBUG] 2022-07-28 11:40:24.871 [pool-4-thread-1] jvm - collector name {:name=>"ParNew"}
[DEBUG] 2022-07-28 11:40:24.871 [pool-4-thread-1] jvm - collector name {:name=>"ConcurrentMarkSweep"}
[DEBUG] 2022-07-28 11:40:25.456 [logstash-pipeline-flush] PeriodicFlush - Pushing flush onto pipeline.
[DEBUG] 2022-07-28 11:40:26.072 [Ruby-0-Thread-9: :1] kusto - Starting flush cycle
[DEBUG] 2022-07-28 11:40:27.808 [Connection evictor] PoolingHttpClientConnectionManager - Closing expired connections
[DEBUG] 2022-07-28 11:40:27.809 [Connection evictor] PoolingHttpClientConnectionManager - Closing connections idle longer than 120000 MILLISECONDS
[DEBUG] 2022-07-28 11:40:28.073 [Ruby-0-Thread-9: :1] kusto - Starting flush cycle
[DEBUG] 2022-07-28 11:40:29.878 [pool-8-thread-1] jvm - collector name {:name=>"ParNew"}
[DEBUG] 2022-07-28 11:40:29.879 [pool-8-thread-1] jvm - collector name {:name=>"ConcurrentMarkSweep"}
[DEBUG] 2022-07-28 11:40:30.073 [Ruby-0-Thread-9: :1] kusto - Starting flush cycle
[DEBUG] 2022-07-28 11:40:30.456 [logstash-pipeline-flush] PeriodicFlush - Pushing flush onto pipeline.
[DEBUG] 2022-07-28 11:40:30.485 [[main]>worker184] kusto - Starting stale files cleanup cycle {:files=>{}}
[DEBUG] 2022-07-28 11:40:30.485 [[main]>worker184] kusto - 0 stale files found {:inactive_files=>{}}
[DEBUG] 2022-07-28 11:40:32.074 [Ruby-0-Thread-9: :1] kusto - Starting flush cycle
[DEBUG] 2022-07-28 11:40:34.074 [Ruby-0-Thread-9: :1] kusto - Starting flush cycle
[DEBUG] 2022-07-28 11:40:34.886 [pool-4-thread-1] jvm - collector name {:name=>"ParNew"}
[DEBUG] 2022-07-28 11:40:34.886 [pool-4-thread-1] jvm - collector name {:name=>"ConcurrentMarkSweep"}
[DEBUG] 2022-07-28 11:40:35.456 [logstash-pipeline-flush] PeriodicFlush - Pushing flush onto pipeline.
[DEBUG] 2022-07-28 11:40:36.075 [Ruby-0-Thread-9: :1] kusto - Starting flush cycle
[DEBUG] 2022-07-28 11:40:38.075 [Ruby-0-Thread-9: :1] kusto - Starting flush cycle
[DEBUG] 2022-07-28 11:40:39.893 [pool-5-thread-1] jvm - collector name {:name=>"ParNew"}
[DEBUG] 2022-07-28 11:40:39.894 [pool-5-thread-1] jvm - collector name {:name=>"ConcurrentMarkSweep"}
[DEBUG] 2022-07-28 11:40:40.076 [Ruby-0-Thread-9: :1] kusto - Starting flush cycle
[DEBUG] 2022-07-28 11:40:40.456 [logstash-pipeline-flush] PeriodicFlush - Pushing flush onto pipeline.
[DEBUG] 2022-07-28 11:40:42.076 [Ruby-0-Thread-9: :1] kusto - Starting flush cycle
[DEBUG] 2022-07-28 11:40:44.077 [Ruby-0-Thread-9: :1] kusto - Starting flush cycle
[DEBUG] 2022-07-28 11:40:44.902 [pool-5-thread-1] jvm - collector name {:name=>"ParNew"}
[DEBUG] 2022-07-28 11:40:44.903 [pool-5-thread-1] jvm - collector name {:name=>"ConcurrentMarkSweep"}
[DEBUG] 2022-07-28 11:40:45.456 [logstash-pipeline-flush] PeriodicFlush - Pushing flush onto pipeline.
[DEBUG] 2022-07-28 11:40:45.457 [[main]>worker209] kusto - Starting stale files cleanup cycle {:files=>{}}
[DEBUG] 2022-07-28 11:40:45.457 [[main]>worker209] kusto - 0 stale files found {:inactive_files=>{}}
[DEBUG] 2022-07-28 11:40:46.077 [Ruby-0-Thread-9: :1] kusto - Starting flush cycle
[DEBUG] 2022-07-28 11:40:48.078 [Ruby-0-Thread-9: :1] kusto - Starting flush cycle
[DEBUG] 2022-07-28 11:40:49.910 [pool-4-thread-1] jvm - collector name {:name=>"ParNew"}
[DEBUG] 2022-07-28 11:40:49.910 [pool-4-thread-1] jvm - collector name {:name=>"ConcurrentMarkSweep"}
[DEBUG] 2022-07-28 11:40:50.078 [Ruby-0-Thread-9: :1] kusto - Starting flush cycle
[DEBUG] 2022-07-28 11:40:50.456 [logstash-pipeline-flush] PeriodicFlush - Pushing flush onto pipeline.
[ERROR] 2022-07-28 11:40:50.513 [Kusto to ingest file: /tmp/kusto/2022-07-28-09-38-30.txt.testBarracudaFW.Marin] kusto - Uploading failed, retrying. {:exception=>Java::ComMicrosoftAzureKustoIngestExceptions::IngestionClientException, :message=>"Failed to ingest from file", :path=>"/tmp/kusto/2022-07-28-09-38-30.txt.testBarracudaFW.Marin", :backtrace=>["com.microsoft.azure.kusto.ingest.QueuedIngestClient.ingestFromFile(com/microsoft/azure/kusto/ingest/QueuedIngestClient.java:191)", "jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)", "jdk.internal.reflect.NativeMethodAccessorImpl.invoke(jdk/internal/reflect/NativeMethodAccessorImpl.java:62)", "jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(jdk/internal/reflect/DelegatingMethodAccessorImpl.java:43)", "java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:566)", "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:426)", "org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:293)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_output_minus_kusto_minus_1_dot_0_dot_4_minus_java.lib.logstash.outputs.kusto.ingestor.upload(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-kusto-1.0.4-java/lib/logstash/outputs/kusto/ingestor.rb:99)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_output_minus_kusto_minus_1_dot_0_dot_4_minus_java.lib.logstash.outputs.kusto.ingestor.upload_async(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-kusto-1.0.4-java/lib/logstash/outputs/kusto/ingestor.rb:73)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:275)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.concurrent_minus_ruby_minus_1_dot_1_dot_9.lib.concurrent_minus_ruby.concurrent.executor.java_executor_service.run(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_executor_service.rb:79)", "java.util.concurrent.Executors$RunnableAdapter.call(java/util/concurrent/Executors.java:515)", "java.util.concurrent.FutureTask.run(java/util/concurrent/FutureTask.java:264)", "java.util.concurrent.ThreadPoolExecutor.runWorker(java/util/concurrent/ThreadPoolExecutor.java:1128)", "java.util.concurrent.ThreadPoolExecutor$Worker.run(java/util/concurrent/ThreadPoolExecutor.java:628)", "java.lang.Thread.run(java/lang/Thread.java:829)"]}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.