Dear All,
I use the ADX output plugin to send FW logs to ADX.
I first tried it with a stdout{}
plugin to massage the data and parse it my way and it was working fine.
Now when I enabled the kusto
output (That is how its called ADX) and started it manually from the command line ./logstash -f my.conf
its also working fine but because the log stream is huge (almost 120 FW appliances feeding in data) at some point after a couple of minutes the Ingestor queue capacity starts running low to 0 slots and the ingestion stops, please see the code snipped below
[INFO ] 2022-07-26 07:10:44.077 [[main]>worker5] kusto - Opening file {:path=>"/tmp/kusto/2022-07-26-07-10-43.txt.testXXXMYTableXXX.testXXXMYTableXXX"}
[INFO ] 2022-07-26 07:10:44.078 [[main]>worker5] kusto - Opening file {:path=>"/tmp/kusto/2022-07-26-11-10-43.txt.testXXXMYTableXXX.testXXXMYTableXXX"}
[INFO ] 2022-07-26 07:10:44.107 [[main]>worker4] kusto - Opening file {:path=>"/tmp/kusto/2022-07-26-05-10-44.txt.testXXXMYTableXXX.testXXXMYTableXXX"}
[INFO ] 2022-07-26 07:10:44.340 [[main]>worker7] kusto - Opening file {:path=>"/tmp/kusto/2022-07-26-08-40-44.txt.testXXXMYTableXXX.testXXXMYTableXXX"}
[INFO ] 2022-07-26 07:10:44.345 [[main]>worker1] kusto - Opening file {:path=>"/tmp/kusto/2022-07-26-04-10-43.txt.testXXXMYTableXXX.testXXXMYTableXXX"}
[INFO ] 2022-07-26 07:10:44.538 [[main]>worker0] kusto - Opening file {:path=>"/tmp/kusto/2022-07-26-12-10-44.txt.testXXXMYTableXXX.testXXXMYTableXXX"}
[INFO ] 2022-07-26 07:10:44.539 [[main]>worker0] kusto - Opening file {:path=>"/tmp/kusto/2022-07-26-11-10-44.txt.testXXXMYTableXXX.testXXXMYTableXXX"}
[INFO ] 2022-07-26 07:10:44.721 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-04-41-11.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.746 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-04-41-12.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.747 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-11-41-12.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.747 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-08-11-12.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.748 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-12-41-12.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.748 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-04-41-13.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.749 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-11-41-11.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.749 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-10-41-12.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.749 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-03-41-12.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.750 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-03-41-13.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.750 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-08-11-13.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.750 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-11-41-13.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.750 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-10-41-13.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.751 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-12-41-13.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.751 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-04-41-14.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.753 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-08-11-14.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.755 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-12-41-14.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.756 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-10-41-14.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.756 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-11-41-14.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.756 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-03-41-14.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.756 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-04-41-15.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.757 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-08-11-15.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.757 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-06-41-14.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.758 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-11-41-15.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.758 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-04-41-16.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.758 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-04-41-17.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.758 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-11-41-16.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.758 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-03-41-16.txt.testXXXMYTableXXX.testXXXMYTableXXX
[INFO ] 2022-07-26 07:10:44.759 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-12-41-16.txt.testXXXMYTableXXX.testXXXMYTableXXX
[WARN ] 2022-07-26 07:10:44.759 [[main]>worker3] kusto - Ingestor queue capacity is running low with 3 free slots.
[INFO ] 2022-07-26 07:10:44.760 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-12-41-17.txt.testXXXMYTableXXX.testXXXMYTableXXX
[WARN ] 2022-07-26 07:10:44.760 [[main]>worker3] kusto - Ingestor queue capacity is running low with 2 free slots.
[INFO ] 2022-07-26 07:10:44.760 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-10-41-17.txt.testXXXMYTableXXX.testXXXMYTableXXX
[WARN ] 2022-07-26 07:10:44.760 [[main]>worker3] kusto - Ingestor queue capacity is running low with 1 free slots.
[INFO ] 2022-07-26 07:10:44.761 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-08-11-16.txt.testXXXMYTableXXX.testXXXMYTableXXX
[WARN ] 2022-07-26 07:10:44.761 [[main]>worker3] kusto - Ingestor queue capacity is running low with 0 free slots.
[INFO ] 2022-07-26 07:10:44.761 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-08-11-17.txt.testXXXMYTableXXX.testXXXMYTableXXX
[WARN ] 2022-07-26 07:10:44.762 [[main]>worker3] kusto - Ingestor queue capacity is running low with 0 free slots.
[INFO ] 2022-07-26 07:10:44.762 [[main]>worker3] kusto - Closing file /tmp/kusto/2022-07-26-10-41-16.txt.testXXXMYTableXXX.testXXXMYTableXXX
[WARN ] 2022-07-26 07:10:44.762 [[main]>worker3] kusto - Ingestor queue capacity is running low with 0 free slots.
[ERROR] 2022-07-26 07:12:41.928 [Kusto to ingest file: /tmp/kusto/2022-07-26-04-41-11.txt.testXXXMYTableXXX.testXXXMYTableXXX] kusto - Uploading failed, retrying. {:exception=>Java::ComMicrosoftAzureKustoIngestExceptions::IngestionClientException, :message=>"Failed to ingest from file", :path=>"/tmp/kusto/2022-07-26-04-41-11.txt.testXXXMYTableXXX.testXXXMYTableXXX", :backtrace=>["com.microsoft.azure.kusto.ingest.QueuedIngestClient.ingestFromFile(com/microsoft/azure/kusto/ingest/QueuedIngestClient.java:191)", "jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)", "jdk.internal.reflect.NativeMethodAccessorImpl.invoke(jdk/internal/reflect/NativeMethodAccessorImpl.java:62)", "jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(jdk/internal/reflect/DelegatingMethodAccessorImpl.java:43)", "java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:566)", "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:426)", "org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:293)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_output_minus_kusto_minus_1_dot_0_dot_4_minus_java.lib.logstash.outputs.kusto.ingestor.upload(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-kusto-1.0.4-java/lib/logstash/outputs/kusto/ingestor.rb:99)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_output_minus_kusto_minus_1_dot_0_dot_4_minus_java.lib.logstash.outputs.kusto.ingestor.upload_async(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-kusto-1.0.4-java/lib/logstash/outputs/kusto/ingestor.rb:73)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:275)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.concurrent_minus_ruby_minus_1_dot_1_dot_9.lib.concurrent_minus_ruby.concurrent.executor.java_executor_service.run(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_executor_service.rb:79)", "java.util.concurrent.Executors$RunnableAdapter.call(java/util/concurrent/Executors.java:515)", "java.util.concurrent.FutureTask.run(java/util/concurrent/FutureTask.java:264)", "java.util.concurrent.ThreadPoolExecutor.runWorker(java/util/concurrent/ThreadPoolExecutor.java:1128)", "java.util.concurrent.ThreadPoolExecutor$Worker.run(java/util/concurrent/ThreadPoolExecutor.java:628)", "java.lang.Thread.run(java/lang/Thread.java:829)"]}
My Unix based server is pretty beefed up so is there a way for me to increase this queue capacity slots or somehow make logstash conf lighter such as batching or something similar? Or in general dos anybody knows how I can overcome this situation?
Below is the sanitized version of my config file
input
{
#stdin{}
syslog {
host => "127.0.0.1"
port => 12345
}
}
filter
{
if [program] =~ "box_Firewall_Activity" and [message] =~ "proto=" {
grok {
keep_empty_captures => true
match => { "message" => "(?<Timezone>\+[\d]{2}:[\d]{2})\s+%{WORD:LogLevel}\s+(?<OriginSource>[a-zA-Z0-9]+)\s+%{WORD:Action}:\s+%{GREEDYDATA:msg}" }
}
dissect { mapping => { "msg" => "type=%{type}|proto=%{proto}|srcIF=%{srcIF}|srcIP=%{srcIP}|srcPort=%{srcPort}|srcMAC=%{srcMAC}|dstIP=%{stIP}|dstPort=%{dstPort}|dstService=%{dstService}|dstIF=%{dstIF}|rule=%{rule}|info=%{info}|srcNAT=%{srcNAT}|dstNAT=%{dstNAT}|duration=%{duration}|count=%{count}|receivedBytes=%{receivedBytes}|sentBytes=%{sentBytes}|receivedPackets=%{receivedPackets}|sentPackets=%{sentPackets}|user=%{user}|protocol=%{protocol}|application=%{application}|target=%{target}|content=%{content}|urlcat=%{urlcat}" } }
mutate { add_tag => "commonFW" add_tag => "PH2" add_tag => "kv" add_tag => "myserver" add_tag => "box_Firewall_Activity_kv" add_field => { "EventTime" => "%{@timestamp}" } }
prune { blacklist_names => [ "message", "msg" ] }
}
else if [program] =~ "box_Firewall_Activity" and [message] !~ "proto=" {
grok {
keep_empty_captures => true
match => { "message" => "(?<Timezone>\+[\d]{2}:[\d]{2})\s+%{WORD:LogLevel}\s+(?<OriginSource>[a-zA-Z0-9]+)\s+%{WORD:Action}:\s+%{GREEDYDATA:msg}" }
}
csv { source => "msg" separator => "|" columns => ["type", "proto", "srcIF", "srcIP", "srcPort", "srcMAC", "dstIP", "dstPort", "dstService", "dstIF", "rule", "info", "srcNAT", "dstNAT", "duration", "count", "receivedBytes", "sentBytes", "receivedPackets", "sentPackets", "user", "protocol", "application", "target", "content", "urlcat"] }
mutate { add_tag => "commonFW" add_tag => "PH2" add_tag => "csvPipe" add_tag => "myserver" add_tag => "box_Firewall_Activity_v" add_field => { "EventTime" => "%{@timestamp}" } }
prune { blacklist_names => [ "message", "msg" ] }
}
else if [program] =~ "box_Firewall_threat" {
dissect { mapping => { "message" => "%{TZ} %{LogLevel} %{LogCollector} %{Service}: [%{TrafficType}] %{Action}: %{Module} %{Rule} |[%{Signature}]|%{}|%{SignatureVersion}|%{Technique}" } }
mutate { add_tag => "ips" add_tag => "PH2" add_tag => "myserver" add_tag => "box_Firewall_threat" add_field => { "EventTime" => "%{@timestamp}" } }
prune { blacklist_names => [ "message", "msg" ] }
} else { drop{} }
cidr {
add_tag => [ "InternalSIP" ]
address => [ "%{srcIP}" ]
network => [ "10.0.0.0/8", "The rest of my networks" ]
}
if [srcIP] =~ ":" { mutate { add_tag => "IPv6" add_tag => "InternalSIP" } }
if "InternalSIP" not in [tags] {
geoip { source => 'srcIP' default_database_type => "ASN" }
if [geoip][as_org] =~ "test1" { mutate { add_tag => "InternalSIP" add_tag => "test1 VM" } }
else if [geoip][as_org] =~ "test2" { mutate { add_tag => "InternalSIP" add_tag => "test2 VM" } }
}
if "InternalSIP" not in [tags] and "commonFW" in [tags] and ([Action] =~ "Block" or [Action] =~ "Denied" or [Action] =~ "Remove") { drop{} }
}
output
{
# if "IPv6" in [tags] { exec { command => 'echo "allowed IPv6 %{srcIP}" ' } }
# else if "InternalSIP" not in [tags] and "commonFW" in [tags] and ([Action] =~ "Block" or [Action] =~ "Denied" or [Action] =~ "Remove") { exec { command => 'echo "blocked ingress fw %{srcIP} %{Action}" ' } }
# else if "InternalSIP" not in [tags] and "commonFW" in [tags] and ([Action] !~ "Block" or [Action] !~ "Denied" or [Action] !~ "Remove") { exec { command => 'echo "allowed ingress fw %{srcIP} %{Action}" ' } }
# else if "InternalSIP" in [tags] and "commonFW" in [tags] { exec { command => 'echo "allowed internal or egress fw %{srcIP} %{tags}" ' } }
# else if "ips" in [tags] { exec { command => 'echo "allowed ips %{Signature} OOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO" ' } }
if "commonFW" in [tags] {
kusto {
path => "/tmp/kusto/%{+YYYY-MM-dd-HH-mm-ss}.txt"
ingest_url => "URL"
app_id => "id"
app_key => "key"
app_tenant => "tenant"
database => "DB"
table => "Table1"
json_mapping => "DBM1"
}
} else if "ips" in [tags] {
kusto {
path => "/tmp/kusto/%{+YYYY-MM-dd-HH-mm-ss}.txt"
ingest_url => "URL"
app_id => "id"
app_key => "key"
app_tenant => "tenant"
database => "DB"
table => "Table2"
json_mapping => "DBM2"
}
}
}