Some logs to Elastic, some to SIEM

Hi,

I'm quite new here, so please forgive my dumbness on this. But I've spent the whole afternoon to figure out how can I achieve what I want. Digged into docs, forums and so on... Maybe it is because I'm coming from the world of syslog-ng, where we handle things differently and maybe I need some time to get use to ELK :slight_smile:

So, here is my usecase in my pilot

From my Linux server I want to send some of the logs using filebeat to Elastic, but security related events to SIEM. After several attemps, I tried this

filebeat

   filebeat.inputs:
- type: log
  paths:
    - "/var/log/audit/audit.log"
  tags: ["security","qradar"]

- type: log
  paths:
    - "/var/log/*"
  tags: ["linux","centos","anytag"]
  multiline.pattern: '^\['
  multiline.negate: true
  multiline.match: after

logstash

input {
  beats {
    port => 5044
  }
}

output {
  elasticsearch {
    hosts => "localhost:9200"
    user => "elastic"
    password => "********"
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  }
}

output {
if ["security","qradar"] in [tags] {

 file {
   path => "/tmp/qradar/test-%{+YYYY-MM-dd}.log"
   codec => line { format => "%{message}"}
 }
}
}

So, obviously this is not working :slight_smile: I even not sure if this is the right approach, so what I really looking for is to "filter" some logs from any source based on some tags, source or even message content and send it to a different destination. Same when we used "filter" statement in syslog-ng log-path.

Thank you
L:

It is not obvious at all. What issue do you have with it?

Hi,

Thank you for the quick response. If add the statement above, logstash keeps restarting with this error

[root@vladcentos7 conf.d]# tail -f /var/log/logstash/logstash-plain.log
[2019-06-20T10:54:17,580][INFO ][logstash.inputs.beats ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[2019-06-20T10:54:17,674][INFO ][logstash.inputs.tcp ] Starting tcp input listener {:address=>"0.0.0.0:5514", :ssl_enable=>"false"}
[2019-06-20T10:54:17,688][INFO ][logstash.javapipeline ] Pipeline started {"pipeline.id"=>"main"}
[2019-06-20T10:54:18,047][INFO ][logstash.inputs.udp ] Starting UDP listener {:address=>"0.0.0.0:5514"}
[2019-06-20T10:54:18,060][INFO ][org.logstash.beats.Server] Starting server on port: 5045
[2019-06-20T10:54:18,084][INFO ][org.logstash.beats.Server] Starting server on port: 5044
[2019-06-20T10:54:18,212][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>}
[2019-06-20T10:54:18,435][INFO ][logstash.inputs.udp ] UDP listener started {:address=>"0.0.0.0:5514", :receive_buffer_bytes=>"106496", :queue_size=>"2000"}
[2019-06-20T10:54:18,575][FATAL][logstash.runner ] An unexpected error occurred! {:error=>org.logstash.config.ir.compiler.EventCondition$Compiler$UnexpectedTypeException: Unexpected input types class org.logstash.config.ir.expression.ValueExpression class org.logstash.config.ir.expression.EventValueExpression, :backtrace=>["org.logstash.config.ir.compiler.EventCondition$Compiler.in(org/logstash/config/ir/compiler/EventCondition.java:283)", "org.logstash.config.ir.compiler.EventCondition$Compiler.buildCondition(org/logstash/config/ir/compiler/EventCondition.java:97)", "org.logstash.config.ir.CompiledPipeline$CompiledExecution.lambda$compileDependencies$4(org/logstash/config/ir/CompiledPipeline.java:388)", "java.util.stream.ReferencePipeline$3$1.accept(java/util/stream/ReferencePipeline.java:193)", "java.util.stream.ReferencePipeline$2$1.accept(java/util/stream/ReferencePipeline.java:175)", "java.util.stream.ReferencePipeline$3$1.accept(java/util/stream/ReferencePipeline.java:193)", "java.util.Iterator.forEachRemaining(java/util/Iterator.java:116)", "java.util.Spliterators$IteratorSpliterator.forEachRemaining(java/util/Spliterators.java:1801)", "java.util.stream.AbstractPipeline.copyInto(java/util/stream/AbstractPipeline.java:482)", "java.util.stream.AbstractPipeline.wrapAndCopyInto(java/util/stream/AbstractPipeline.java:472)", "java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java/util/stream/ReduceOps.java:708)", "java.util.stream.AbstractPipeline.evaluate(java/util/stream/AbstractPipeline.java:234)", "java.util.stream.ReferencePipeline.collect(java/util/stream/ReferencePipeline.java:499)", "org.logstash.config.ir.CompiledPipeline$CompiledExecution.compileDependencies(org/logstash/config/ir/CompiledPipeline.java:400)", "org.logstash.config.ir.CompiledPipeline$CompiledExecution.flatten(org/logstash/config/ir/CompiledPipeline.java:361)", "org.logstash.config.ir.CompiledPipeline$CompiledExecution.lambda$compile$1(org/logstash/config/ir/CompiledPipeline.java:270)", "java.util.stream.ReferencePipeline$3$1.accept(java/util/stream/ReferencePipeline.java:193)", "java.util.ArrayList$ArrayListSpliterator.forEachRemaining(java/util/ArrayList.java:1382)", "java.util.stream.AbstractPipeline.copyInto(java/util/stream/AbstractPipeline.java:482)", "java.util.stream.AbstractPipeline.wrapAndCopyInto(java/util/stream/AbstractPipeline.java:472)", "java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java/util/stream/ReduceOps.java:708)", "java.util.stream.AbstractPipeline.evaluate(java/util/stream/AbstractPipeline.java:234)", "java.util.stream.ReferencePipeline.collect(java/util/stream/ReferencePipeline.java:499)", "org.logstash.config.ir.CompiledPipeline$CompiledExecution.compile(org/logstash/config/ir/CompiledPipeline.java:271)", "org.logstash.config.ir.CompiledPipeline$CompiledExecution.(org/logstash/config/ir/CompiledPipeline.java:251)", "org.logstash.config.ir.CompiledPipeline.buildExecution(org/logstash/config/ir/CompiledPipeline.java:103)", "org.logstash.execution.WorkerLoop.(org/logstash/execution/WorkerLoop.java:46)", "java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:423)", "org.jruby.javasupport.JavaConstructor.newInstanceDirect(org/jruby/javasupport/JavaConstructor.java:247)", "org.jruby.RubyClass.newInstance(org/jruby/RubyClass.java:915)", "org.jruby.RubyClass$INVOKER$i$newInstance.call(org/jruby/RubyClass$INVOKER$i$newInstance.gen)", "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:235)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:295)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:274)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:270)", "java.lang.Thread.run(java/lang/Thread.java:748)"]}
[2019-06-20T10:54:18,950][ERROR][org.logstash.Logstash ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit

What I really looking for, what is the most efficient way in Logstash to filter incoming messages and send some to different directions. I think this is a very basic functionality, but I can't find a real answer in the documentation.

Thank you
L:

logstash has raised an exception when compiling the configuration. The configuration you posted has a single beats input. The configuration it is compiling has two beats inputs and a udp input. We will not be able to diagnose the issue without seeing the configuration that it is failing to compile.

So, as I wrote earlier, the usecase is quite simple (but I'm new to Logstash, so maybe may approach is totally wrong). I need to send some of the logs to one destination and some to other. I don't think there is a simpler one in log management, but it seems I'm totally lost and to be honest, I'm a bit tired to try debug config... comparing to other product (where I'm more familiar with), configure logstash is quite difficult, so I will be very happy if someone can help me to put together my first simple config for this usecase, so I'll be in love with Logstash :slight_smile:

So, here is my latest config, but it is even not compiling... if someone can send me a sample config for the same usecase, it would be great also

input {
beats {
port => 5044
}
}

output {
if service.type == "auditd" {
file {
path => "/tmp/qradar/test-%{+YYYY-MM-dd}.log"
codec => line { format => "%{message}"}
}
else {
elasticsearch {
hosts => "localhost:9200"
user => "elastic"
password => "xxxxxxxxxx"
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
}

Thank you
L:

If that was the configuration you were running you would not get those messages in your logfile, therefore that it not your complete configuration. My guess is that you have specified path.config as a directory and there are other files in the directory. If you pass logstash a directory as path.config it will read every file in the directory as part of the configuration, be it something.conf, something.conf.bak, or even hs_err_pid1234.log

Hi,

It seems the first part is solved like this

input {
beats {
port => 5044
}
}

output {
if [service][type] == "auditd" {

 syslog {
    appname => "security"
    host => "172.16.60.12"
    port => "514"
    protocol => "tcp"
    rfc => "rfc5424"
    codec => "line"
 }

stdout {}

}

elasticsearch {
    hosts => "localhost:9200"
    user => "elastic"
    password => "*****"
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    }

}

So, I can send auditd logs to SIEM and everything to Elasticsearch.

Now, I want to do the same for Windows Security Events, but it seems it requires slightly different approach

My latest attemp was this for output conditional, but it does not catch anything. Windows logs are sent by winlogbeat to logstash to the same port

if [log_name] == "Security" {

Any idea how to filter everything in Windows Security Events?

A sample event looks like this

"message" => "An account was successfully logged on.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tWINSRV$\n\tAccount Domain:\t\tWORKGROUP\n\tLogon ID:\t\t0x3E7\n\nLogon Type:\t\t\t5\n\nImpersonation Level:\t\tImpersonation\n\nNew Logon:\n\tSecurity ID:\t\tS-1-5-18\n\tAccount Name:\t\tSYSTEM\n\tAccount Domain:\t\tNT AUTHORITY\n\tLogon ID:\t\t0x3E7\n\tLogon GUID:\t\t{00000000-0000-0000-0000-000000000000}\n\nProcess Information:\n\tProcess ID:\t\t0x1d0\n\tProcess Name:\t\tC:\Windows\System32\services.exe\n\nNetwork Information:\n\tWorkstation Name:\t-\n\tSource Network Address:\t-\n\tSource Port:\t\t-\n\nDetailed Authentication Information:\n\tLogon Process:\t\tAdvapi \n\tAuthentication Package:\tNegotiate\n\tTransited Services:\t-\n\tPackage Name (NTLM only):\t-\n\tKey Length:\t\t0\n\nThis event is generated when a logon session is created. It is generated on the computer that was accessed.\n\nThe subject fields indicate the account on the local system which requested the logon. This is most commonly a service such as the Server service, or a local process such as Winlogon.exe or Services.exe.\n\nThe logon type field indicates the kind of logon that occurred. The most common types are 2 (interactive) and 3 (network).\n\nThe New Logon fields indicate the account for whom the new logon was created, i.e. the account that was logged on.\n\nThe network fields indicate where a remote logon request originated. Workstation name is not always available and may be left blank in some cases.\n\nThe impersonation level field indicates the extent to which a process in the logon session can impersonate.\n\nThe authentication information fields provide detailed information about this specific logon request.\n\t- Logon GUID is a unique identifier that can be used to correlate this event with a KDC event.\n\t- Transited services indicate which intermediate services have participated in this logon request.\n\t- Package name indicates which sub-protocol was used among the NTLM protocols.\n\t- Key length indicates the length of the generated session key. This will be 0 if no session key was requested.",
"@timestamp" => 2019-06-20T09:21:58.366Z,
"agent" => {
"type" => "winlogbeat",
"id" => "208c3de3-a1dc-41f1-9616-7c23424bd6d6",
"ephemeral_id" => "e245ec55-6a2d-4c0d-8b91-e4e0f2c2d1f0",
"version" => "7.1.1",
"hostname" => "winsrv"
},
"log" => {
"level" => "information"
},
"@version" => "1",
"event" => {
"kind" => "event",
"action" => "Logon",
"created" => "2019-06-21T14:32:35.006Z",
"code" => 4624
},
"winlog" => {
"provider_name" => "Microsoft-Windows-Security-Auditing",
"keywords" => [
"Audit Success"
],
"record_id" => 7489,
"process" => {
"thread" => {
"id" => 908
},
"pid" => 472
},
"opcode" => "Info",
"task" => "Logon",
"channel" => "Security",
"version" => 1,
"provider_guid" => "{54849625-5478-4994-A5BA-3E3B0328C30D}",
"computer_name" => "winsrv",
"event_data" => {
"IpPort" => "-",
"IpAddress" => "-",
"TargetUserSid" => "S-1-5-18",
"SubjectLogonId" => "0x3e7",
"TransmittedServices" => "-",
"SubjectUserName" => "WINSRV$",
"WorkstationName" => "-",
"SubjectDomainName" => "WORKGROUP",
"ProcessName" => "C:\Windows\System32\services.exe",
"ImpersonationLevel" => "%%1833",
"AuthenticationPackageName" => "Negotiate",
"ProcessId" => "0x1d0",
"TargetUserName" => "SYSTEM",
"LogonType" => "5",
"KeyLength" => "0",
"SubjectUserSid" => "S-1-5-18",
"TargetDomainName" => "NT AUTHORITY",
"LmPackageName" => "-",
"TargetLogonId" => "0x3e7",
"LogonGuid" => "{00000000-0000-0000-0000-000000000000}",
"LogonProcessName" => "Advapi "
},
"api" => "wineventlog",
"event_id" => 4624
},
"host" => {
"architecture" => "x86_64",
"id" => "cc734299-d910-4f9c-ac26-36eb408de643",
"name" => "winsrv",
"hostname" => "winsrv",
"os" => {
"family" => "windows",
"platform" => "windows",
"name" => "Windows Server 2012 R2 Datacenter",
"version" => "6.3",
"kernel" => "6.3.9600.19376 (winblue_ltsb_escrow.190520-1700)",
"build" => "9600.19377"
}
}
}
{
"tags" => [
"beats_input_codec_plain_applied"
],
"ecs" => {
"version" => "1.0.0"
},

Thanks

So, the filtering issue has been solved like this

if [winlog][channel] == "Security" or [service][type] == "auditd" {

     syslog {
        appname => "security"
        host => "172.16.60.12"
        port => "514"
        protocol => "udp"
        #rfc => "rfc5424"
        codec => line { format => "%{message}" }
        #codec => line
        sourcehost => "%{host}"
     }}

So my SIEM receives the messages should be there and Elastic everything.

However I still have two basic issues to solve

SIEM (Qradar) recognizes the linux audit messages and parses correctly from the following forwarder input (from Logstash)

<13>Jun 24 15:14:59 {"name":"CentOS7Vlad","hostname":"vladcentos7.intra","id":"05976e934c954085a28184accf4245f1","os":{"name":"CentOS Linux","family":"redhat","version":"7 (Core)","kernel":"3.10.0-957.21.2.el7.x86_64","platform":"centos","codename":"Core"},"architecture":"x86_64","containerized":false} security: type=CRYPTO_KEY_USER msg=audit(1561389292.770:839): pid=19824 uid=0 auid=1000 ses=1 subj=system_u:system_r:sshd_t:s0-s0:c0.c1023 msg='op=destroy kind=server fp=SHA256:f3:7b:d7:68:b3:e4:31:f9:e6:b5:35:18:27:aa:a5:0b:61:b4:e4:9d:bc:9b:ef:c5:7e:95:2e:1d:1f:fd:e3:1a direction=? spid=98029 suid=1000 exe="/usr/sbin/sshd" hostname=? addr=? terminal=? res=success'

However, as you can see this syslog message does not comply with the standards, so Qradar cannot parse certain values (most important is the source host, so it will not rely on IP header)

Anyone here has already som trick for this? I suppose a properly created format for the line codec would be sufficient or maybe some filter

I have something like this in my mind

codec => line { format => "%{original_timestamp} %{original_host} %{message}" }

Thank you

`

Adding some more info suggested by account manager... maybe someone can help, so I don't have to include some real syslog server instead of logstash as a relay...

Yes, this is the issue. I can only add %{message}, but I cannot build a full syslog header including %{logsource} and other important function. Neither if I try by creating a new field (like in my config below) or if I use the codec option in syslog output where I think I also should able to build a proper syslog message like this

codec => line { format => "<%{syslog_pri}> %{sourcehost}%{message}" }

It seems, these variables are never replaced by the actual value because in output I always see the variable and not the value even if I try to use something from the official doc like this

filter {
mutate {
replace => { "message" => "%{hostname}: My new message" }
}
}

So, as as summary I would like to build some proper syslog output from logstash toward qradar. Meaning, the outgoing message comply with the syslog RFC, either BSD or new-Syslog

RFC5424

RFC3164

Is there any chance to achive this with Logstash, or should I put my efforts on syslog-ng/nxlog/rsyslog?

Thanks
Laszlo

This got a bit complicated and changed direction. The current problem is that you need to change the format of your log messages so that your SIEM can properly work with them, right?

mutate sounds like a good start. If you are still stuck, can you provide a log event that you get and how it should look like after processing? That would make it much easier to help you build the right filter.

It is not complicated at all I think :slight_smile: And I've faced with similar issues with Graylog, so it seems this whole ecosystem (and admittedly most of the non-syslog syslog implementation) struggling with the same issue.

Message coming from a file source for example, (but most of the time same is true when it coming from a network (syslog) source, and must be forwarded to another syslog server (in this case SIEM syslog receiver) without altering the original message, or build an RFC compliant syslog message. This is where most of the non-syslog syslog implementation bleed-out.

So, at this moment as I see the only solution is to put some real-syslog implementation in the middle as relay (syslog-ng, nxlog, rsyslog) and use it to feed both ES / Graylog and SIEM. ES / Graylog is great if you stay within the ecosystem (much like Apple :)), but if you need something more flexible, you are doomed :slight_smile:

Thanks anyway
L:

ps: the only thing can't leave my mind, why my mutate filter aren't working at all? I even tried to use examples from the doc, but in the forwarded messages the variable never substituted by the actual values except %message%. Anyway, all he configs of my attempts are in this thread above

Discuss is best effort. If it takes more than a minute to understand your current problem (which is the case here), your chances of somebody helping you out start to diminish.

If it‘s a question around mutate, it should be relatively straight forward if there is the input, expected output, and what you are currently trying out in your filter...

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.