Hi everyone! I'm new to ELK and have been enjoying very much working with it so far. I am currently evaluating ELK with Elastic Security as a SIEM in a test environment. I have tried sending both data from filebeat with the system module, and auditbeat with all its modules through a server with logstash that sends the logs to Elastic Search/Kibana. All of these logs are parsed perfectly and I get all the fields needed for Elastic Security functionality such as event.category, and thus the hosts show up in Elastic Security with "uncommon processes", "authentications" etc.
However, I am also sending logs from winlogbeat to the same logstash input, and while these events get a lot of the correct fields in Kibana, vital fields for Elastic Security such as event.category aren't added, and I either get a bunch of data in the event.original field or ignored_field_values. Because of this "authentications", "uncommon processes" and likewise don't show for the hosts in Elastic Security. When I send the logs from Winlogbeat directly to Elasticsearch, the event.category field shows up correctly. My guess is that Logstash isn't using the correct pipeline when parsing the logs and sending them to Elasticsearch, but as far as I can see I have done it according to the documentation which explains the same method as for the other beats. If someone could help me with what I might be doing wrong it would be greatly appreciated.
Vital information from a winlogbeat.yml file (I have a lot more events in the processors and didn't add them all here to keep the summary shorter, I have also tried not filtering events at all):
winlogbeat.event_logs:
- name: Security
processors:
- drop_event.when.not.or:
- equals.winlog.event_id: "4608"
- equals.winlog.event_id: "4609"
- equals.winlog.event_id: "4610"
- name: Microsoft-Windows-Sysmon/Operational
processors:
- drop_event.when.not.or:
- equals.winlog.event_id: "4608"
- equals.winlog.event_id: "4609"
- equals.winlog.event_id: "4610"
- name: Windows PowerShell
processors:
- drop_event.when.not.or:
- equals.winlog.event_id: "4608"
- equals.winlog.event_id: "4609"
- equals.winlog.event_id: "4610"
output.logstash:
hosts: ["x.x.x.x:5044"]
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
Vital information from auditbeat.yml since this is working (modules are enabled outside of the file):
output.logstash:
hosts: ["100.72.20.38:5044"]
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
Logstash server running on Rocky Linux 9, file /etc/logstash/conf.d/beats.conf that's used for all beats inputs (I will configure this to use more security best practices such as https for the input later but I'm just trying to get the basic functionality to work right now):
input {
beats {
port => 5044
}
}
output {
if [@metadata][pipeline] {
elasticsearch {
hosts => "https://x.x.x.x:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}"
action => "create"
pipeline => "%{[@metadata][pipeline]}"
user => "xxx"
password => "xxx"
cacert => 'xxxt'
}
} else {
elasticsearch {
hosts => "https://x.x.x.x:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}"
action => "create"
user => "xxx"
password => "xxx"
cacert => 'xxx'
}
}
}
When I tried adding pipeline => "%{[@metadata][pipeline]}" to the "else" block I get the following error:
:response=>{"create"=>{"_index"=>"winlogbeat-8.8.2", "_id"=>nil, "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"pipeline with id [%{[@metadata][pipeline]}] does not exist"}}}}