Winlogbeat logs sent through logstash aren't parsed correctly

Hi everyone! I'm new to ELK and have been enjoying very much working with it so far. I am currently evaluating ELK with Elastic Security as a SIEM in a test environment. I have tried sending both data from filebeat with the system module, and auditbeat with all its modules through a server with logstash that sends the logs to Elastic Search/Kibana. All of these logs are parsed perfectly and I get all the fields needed for Elastic Security functionality such as event.category, and thus the hosts show up in Elastic Security with "uncommon processes", "authentications" etc.

However, I am also sending logs from winlogbeat to the same logstash input, and while these events get a lot of the correct fields in Kibana, vital fields for Elastic Security such as event.category aren't added, and I either get a bunch of data in the event.original field or ignored_field_values. Because of this "authentications", "uncommon processes" and likewise don't show for the hosts in Elastic Security. When I send the logs from Winlogbeat directly to Elasticsearch, the event.category field shows up correctly. My guess is that Logstash isn't using the correct pipeline when parsing the logs and sending them to Elasticsearch, but as far as I can see I have done it according to the documentation which explains the same method as for the other beats. If someone could help me with what I might be doing wrong it would be greatly appreciated.

Vital information from a winlogbeat.yml file (I have a lot more events in the processors and didn't add them all here to keep the summary shorter, I have also tried not filtering events at all):

winlogbeat.event_logs:

  - name: Security
    processors:
      - drop_event.when.not.or:
        - equals.winlog.event_id: "4608"
        - equals.winlog.event_id: "4609"
        - equals.winlog.event_id: "4610"

  - name: Microsoft-Windows-Sysmon/Operational
    processors:
      - drop_event.when.not.or:
        - equals.winlog.event_id: "4608"
        - equals.winlog.event_id: "4609"
        - equals.winlog.event_id: "4610"

  - name: Windows PowerShell
    processors:
      - drop_event.when.not.or:
        - equals.winlog.event_id: "4608"
        - equals.winlog.event_id: "4609"
        - equals.winlog.event_id: "4610"

output.logstash:

  hosts: ["x.x.x.x:5044"]

processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~

Vital information from auditbeat.yml since this is working (modules are enabled outside of the file):

output.logstash:
  hosts: ["100.72.20.38:5044"]

processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~
  - add_docker_metadata: ~

Logstash server running on Rocky Linux 9, file /etc/logstash/conf.d/beats.conf that's used for all beats inputs (I will configure this to use more security best practices such as https for the input later but I'm just trying to get the basic functionality to work right now):

input {
    beats {
        port => 5044
      }
}
output {
  if [@metadata][pipeline] {
    elasticsearch {
      hosts => "https://x.x.x.x:9200"
      manage_template => false
      index => "%{[@metadata][beat]}-%{[@metadata][version]}"
      action => "create"
      pipeline => "%{[@metadata][pipeline]}"
      user => "xxx"
      password => "xxx"
      cacert => 'xxxt'
    }
  } else {
    elasticsearch {
      hosts => "https://x.x.x.x:9200"
      manage_template => false
      index => "%{[@metadata][beat]}-%{[@metadata][version]}"
      action => "create"
      user => "xxx"
      password => "xxx"
      cacert => 'xxx'
    }
  }
}

When I tried adding pipeline => "%{[@metadata][pipeline]}" to the "else" block I get the following error:

:response=>{"create"=>{"_index"=>"winlogbeat-8.8.2", "_id"=>nil, "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"pipeline with id [%{[@metadata][pipeline]}] does not exist"}}}}

Assuming you have the pipelines installed in Elasticsearch, can you try using this general logic in the LS config

  else if [@metadata][beat] == "winlogbeat" {
    elasticsearch {
      ...
      pipeline => "%{[@metadata][beat]}-%{[@metadata][version]}-routing"
2 Likes

Hi Andrew,

That actually seems to have solved the problem, thank you very much for the quick help!

I created an issue to improve the documentation for Winlogbeat modules.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.