Hello All,
Using the kusto output plugin for Azure Data explorer I did a successful test with the example as per the documentation below
References:
Ingest data from Logstash to Azure Data Explorer | Microsoft Docs
GitHub - Azure/logstash-output-kusto: Logstash output for Kusto
Then I repeated the exercise just replacing the values with the relevant ones in the configuration file but upon starting the pipeline an error pops up.
- My config file is:
input {
syslog {
host => "127.0.0.1"
port => 5004
}
}
filter
{
if [program] =~ "box_Firewall" {
grok
{
match => { "message" => "^(?<Timezone>\+[\d]{2}:[\d]{2})\s+%{WORD:LogLevel}\s+(?<OriginSource>[a-zA-Z0-9]+)\s+%{WORD:Action}:\s+%{GREEDYDATA:msg}$" }
}
dissect
{
mapping => { "msg" => "%{type}|%{proto}|%{srcIF}|%{srcIP}|%{srcPort}|%{srcMAC}|%{dstIP}|%{dstPort}|%{dstService}|%{dstIF}|%{rule}|%{info}|%{srcNAT}|%{dstNAT}|%{duration}|%{count}|%{receivedBytes}|%{sentBytes}|%{receivedPackets}|%{sentPackets}|%{user}|%{protocol}|%{app}|%{target}|%{content}|%{urlcat}" }
}
}
else { drop{} }
}
output
{
#if [tags] =~ /fail/
# {
# stdout{}
# }
#else { exec { command => "echo OK" quiet => true } }
# stdout{}
kusto {
path => "/tmp/kusto/%{+YYYY-MM-dd-HH-mm-ss}.txt"
ingest_url => "https://ingest-networkservices.westeurope.kusto.windows.net"
app_id => "XXX"
app_key => "XXX"
app_tenant => "XXX"
database => "firewall"
table => "barracuda"
json_mapping => "fw"
}
}
- An example log entry
+01:00 Security zdecuda01 Block: LIN|ICMP|bond0.2900|10.61.24.3|63037|50:6b:8d:cc:43:a4|10.61.24.1|63037||bond0.2900|OP-SRV-VPN|0|10.61.24.3|10.61.24.1|0|1|0|0|0|0||||||
- In
Azure Data Explorer
in its Query panel one needs to create atable
andmapping
.
My table is:
.create table barracuda (timestamp: datetime, Timezone: string, LogLevel: string, OriginSource: string, Action: string, type: string, proto: string, srcIF: string, srcIP: string, srcPort: string, srcMAC: string, dstIP: string, dstPort: string, dstService: string, dstIF: string, rule: string, info: string, srcNAT: string, dstNAT: string, duration: string, count: string, receivedBytes: string, sentBytes: string, receivedPackets: string, sentPackets: string, user: string, protocol: string, app: string, target: string, content: string, urlcat: string)
- My mapping is
.create table barracuda ingestion json mapping 'fw' '[{"column":"timestamp","path":"$.@timestamp"},{"column":"Timezone","path":"$.Timezone"},{"column":"LogLevel","path":"$.LogLevel"},{"column":"OriginSource","path":"$.OriginSource"},{"column":"Action","path":"$.Action"},{"column":"type","path":"$.type"},{"column":"proto","path":"$.proto"},{"column":"srcIF","path":"$.srcIF"},{"column":"srcIP","path":"$.srcIP"},{"column":"srcPort","path":"$.srcPort"},{"column":"srcMAC","path":"$.srcMAC"},{"column":"dstIP","path":"$.dstIP"},{"column":"dstPort","path":"$.dstPort"},{"column":"dstService","path":"$.dstService"},{"column":"dstIF","path":"$.dstIF"},{"column":"rule","path":"$.rule"},{"column":"info","path":"$.info"},{"column":"srcNAT","path":"$.srcNAT"},{"column":"dstNAT","path":"$.dstNAT"},{"column":"duration","path":"$.duration"},{"column":"count","path":"$.count"},{"column":"receivedBytes","path":"$.receivedBytes"},{"column":"sentBytes","path":"$.sentBytes"},{"column":"receivedPackets","path":"$.receivedPackets"},{"column":"sentPackets","path":"$.sentPackets"},{"column":"user","path":"$.user"},{"column":"protocol","path":"$.protocol"},{"column":"app","path":"$.application"},{"column":"target","path":"$.target"},{"column":"content","path":"$.content"},{"column":"urlcat","path":"$.urlcat"}]'
- The error is -
:error=>"(EFAULT) Bad address - echo"
FULL ERROR
[2022-01-12T21:32:21,237][ERROR][logstash.javapipeline ] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"dataexplorer", :error=>"(EFAULT) Bad address - echo", :exception=>Java::OrgJrubyExceptions::SystemCallError, :backtrace=>["org.jruby.RubyProcess.spawn(org/jruby/RubyProcess.java:1670)", "org.jruby.RubyKernel.spawn(org/jruby/RubyKernel.java:1658)", "uri_3a_classloader_3a_.META_minus_INF.jruby_dot_home.lib.ruby.stdlib.open3.popen_run(uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/open3.rb:199)", "uri_3a_classloader_3a_.META_minus_INF.jruby_dot_home.lib.ruby.stdlib.open3.popen3(uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/open3.rb:95)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_output_minus_exec_minus_3_dot_1_dot_4.lib.logstash.outputs.exec.receive(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-exec-3.1.4/lib/logstash/outputs/exec.rb:51)", "usr.share.logstash.logstash_minus_core.lib.logstash.outputs.base.multi_receive(/usr/share/logstash/logstash-core/lib/logstash/outputs/base.rb:105)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1821)", "usr.share.logstash.logstash_minus_core.lib.logstash.outputs.base.multi_receive(/usr/share/logstash/logstash-core/lib/logstash/outputs/base.rb:105)", "org.logstash.config.ir.compiler.OutputStrategyExt$AbstractOutputStrategyExt.multi_receive(org/logstash/config/ir/compiler/OutputStrategyExt.java:143)", "org.logstash.config.ir.compiler.AbstractOutputDelegatorExt.multi_receive(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:121)", "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:299)"], :thread=>"#<Thread:0x323e009e sleep>"}
[2022-01-12T21:32:21,537][INFO ][logstash.inputs.syslog ] connection error: {:exception=>IOError, :message=>"closed stream"}
[2022-01-12T21:32:26,744][INFO ][logstash.javapipeline ] Pipeline terminated {"pipeline.id"=>"dataexplorer"}
The config file is tested and it works with stdout{}
for instance (logs are arriving at port 5004, grok and dissect works , etc..).
I'm really stuck so every advice would be much appreciated