Logstash config for windows events

Hi All,

I'm very new to logstash, so i'm new to manually configuring its config for event filtering.

What i am planning:

1:
Collect windows events.
Sent from a remote location by windows event forwarding service on winrm port 5985 (source initiated flow).

2:
Filter by domain 'and' event id to create a log file from the filter, this will make 6 log files
3 domains with 2 logfiles depending on event ID

3:
Filter by just domain, create a log file from the filter, this will make 1 log file

4:
Add another filter for anything missed, or unexpected from the previous filters
and create a logfile this brings us to 8 log files

5:
Send the logs from each logfile onto a remote destination using a different port for each logfile

.
.
What i need from the config.

If the destination Port goes down, the log file for the specific filter/port combo continues to receive data and grow but understands it has not been able to send data.

If all the destination ports or destination IP goes down, then each filter/port combo will continue to receive data and grow but understands it has not been able to send data.

Logstash automatically understands the destination has recovered, and continues or potentially restarts the data flow as per the filter/port combo, starting with the oldest unsent log.
I am ok with max retry attempts being infinite, as we can define a less aggressive retry interval set.

Each logfile has a max size, and when reached the oldest events are dropped.
Potentially succesfully sent logfiles are removed from the logfile.

.
.
.
Here is a config i have created, it is missing the full output section as i have knowledge gaps, but has a snippet of how i am thinking at this time.
I do not know if i need a DLQ

input {
  winlog {
    port => 5985
  }
}

filter {
  # Filter for events where domain contains ASIAPACIFIC and Event ID is 4624
  if [domain] =~ /ASIAPACIFIC/i and [event_id] == 4624 {
    file {
      path => "/path/to/save/ASIAPACIFIC-4624.log"
      codec => "json_lines"
    }
    tcp {
      host => "DEST_IP"
      port => 8560
      codec => "json_lines"
    }
  } else if [domain] =~ /ASIAPACIFIC/i and ![event_id] == 4624 {
    # Filter for events where domain contains ASIAPACIFIC and Event ID is not 4624
    file {
      path => "/path/to/save/ASIAPACIFIC-NOT-4624.log"
      codec => "json_lines"
    }
    tcp {
      host => "DEST_IP"
      port => 8561
      codec => "json_lines"
    }
  } else if [domain] =~ /EMEA/i and [event_id] == 4624 {
    # Filter for events where domain contains EMEA and Event ID is 4624
    file {
      path => "/path/to/save/EMEA-4624.log"
      codec => "json_lines"
    }
    tcp {
      host => "DEST_IP"
      port => 8562
      codec => "json_lines"
    }
  } else if [domain] =~ /EMEA/i and ![event_id] == 4624 {
    # Filter for events where domain contains EMEA and Event ID is not 4624
    file {
      path => "/path/to/save/EMEA-NOT-4624.log"
      codec => "json_lines"
    }
    tcp {
      host => "DEST_IP"
      port => 8563
      codec => "json_lines"
    }
  } else if [domain] =~ /AMERICAS/i and [event_id] == 4624 {
    # Filter for events where domain contains AMERICAS and Event ID is 4624
    file {
      path => "/path/to/save/AMERICAS-4624.log"
      codec => "json_lines"
    }
    tcp {
      host => "DEST_IP"
      port => 8564
      codec => "json_lines"
    }
  } else if [domain] =~ /AMERICAS/i and ![event_id] == 4624 {
    # Filter for events where domain contains AMERICAS and Event ID is not 4624
    file {
      path => "/path/to/save/AMERICAS-NOT-4624.log"
      codec => "json_lines"
    }
    tcp {
      host => "DEST_IP"
      port => 8566
      codec => "json_lines"
    }
  } else if [domain] =~ /ROOT/i {
    # Filter for events where domain contains CPQROOT
    file {
      path => "/path/to/save/ROOT.log"
      codec => "json_lines"
    }
    tcp {
      host => "DEST_IP"
      port => 8567
      codec => "json_lines"
    }
  } else {
    # Filter for events that do not match any above filter
    file {
      path => "/path/to/save/unmatched.log"
      codec => "json_lines"
    }
    tcp {
      host => "DEST_IP"
      port => 8568
      codec => "json_lines"
    }
  }
}
output {
  # Example output plugin with retry and storage options
  tcp {
    host => "dest_ip"
    port => 8560
    retry_max_attempts => 0
    retry_initial_interval => 2
    retry_max_interval => 60
    retry_on_failure => true

    # Configure dead letter queue
    dead_letter_queue {
      enable => true
      path => "/path/to/dead_letter_queue"
      max_bytes => 1073741824 # 1GB
    }
  }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.