Collecting syslog from several different vendors

 Hello.
 We have got an task of setting up an Elasticstack server that collects as a start syslog from different vendors in their environment, visualize it in Kibana and forward the syslogs to a CERT department
 As a start we will collect syslog from HP printers, Axis cameras, HP switches, iDRAC, QNAP, Esxi, iLO and so on.
One of my colleagues suggested that we could collect all syslog into one index named "syslog" index, and then put an metadata tagg indexname in the input section of the pipeline configuration.
This is how we have setup the pipeline configuration to collect syslog from different vendors:

# syslog-pipeline.conf
# For collection of syslog on several ports
# Changes: 
# 2021-05-07 SB: This is the first version

#if [type] in ['syslog','syslog_idrac','syslog_procurve','syslog_axis','syslog_qnap','syslog_laserjet','syslog_esxi','syslog_ilo']
input {
  tcp  { 
    port => "514"
    codec => plain { charset => "ISO-8859-1" }
    type => "syslog"
    add_field => { "[@metadata][indexname]" => "syslog" } # The indexname parameter is used to choose which index or datastream that should be written to
  }
  udp  {
    port => "514"
    codec => plain { charset => "ISO-8859-1" }
    type => "syslog"
    add_field => { "[@metadata][indexname]" => "syslog" } 
  }
  udp  {
    port => "7514"
    codec => plain { charset => "ISO-8859-1" }
    type => "syslog_idrac"
    add_field => { "[@metadata][indexname]" => "syslog" } 
  }
  tcp  { 
    port => "7515"
    codec => plain { charset => "ISO-8859-1" }
    type => "syslog_procurve"
    add_field => { "[@metadata][indexname]" => "syslog" } 
  }
  tcp  { 
    port => "7516"
    codec => plain { charset => "ISO-8859-1" }
    type => "syslog_axis"
    add_field => { "[@metadata][indexname]" => "syslog" } 
  }
  tcp  { 
    port => "7517"
    codec => plain { charset => "ISO-8859-1" }
    type => "syslog_qnap"
    add_field => { "[@metadata][indexname]" => "syslog" } 
  }
  tcp  { 
    port => "7518"
    codec => plain { charset => "ISO-8859-1" }
    type => "syslog_laserjet"
    add_field => { "[@metadata][indexname]" => "syslog" } 
  }
  tcp  { 
    port => "8514"
    codec => plain { charset => "ISO-8859-1" }
    type => "syslog_esxi"
    add_field => { "[@metadata][indexname]" => "syslog" } 
  }
  udp  { 
    port => "9514"
    codec => plain { charset => "ISO-8859-1" }
    type => "syslog_ilo"
    add_field => { "[@metadata][indexname]" => "syslog" } 
  }
}

filter {
  clone { # clones the input and lets us have a clean forward to CERT while still adding filters to the data sent to elasticsearch
    clones => ['to_cert']
  } # end of clone
  if [type] == 'to_cert' {
    mutate {
      add_field => { "[@metadata][endpoint]" => "to_cert" } # set endpoint field to point to cert
      remove_field => ['type']  # Remove type field from forward to CERT to ensure that they recieve clean logs 
    }
  } # end of if type eq to_cert
#  if [type] == 'syslog' {
  if [type] in ['syslog','syslog_idrac','syslog_procurve','syslog_axis','syslog_qnap','syslog_laserjet','syslog_esxi','syslog_ilo'] {
    mutate {
      add_field => { "[@metadata][endpoint]" => "to_elastic" } # set endpoint field to point to elastic
    }
    grok {                    # Match syslog data and add fields
      match => { "message" => "^(?:%{DATA:syslog_start})(?:<%{NONNEGINT:[log][syslog][priority]:int}>). +(?:%{TIMESTAMP_ISO8601:syslog_timestamp}|-) +(?:%{SYSLOGHOST:syslog_hostname}|-) +(?:%{DATA:syslog_app}|-) +(?:%{WORD:syslog_proc}|-) +(?:%{WORD:syslog_msgid}|-) +(?:\[%{DATA:syslog_sd}\]|-|) %{GREEDYDATA:syslog_message}$"}
      add_field => [ "syslog_received_at", "%{@timestamp}" ]
      add_field => [ "syslog_received_from", "%{host}" ]
    }
    date { # Match date in syslog message and set timestamp field to this
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601"]
      timezone => "UTC"
    }
  } # end of if type in [xxx, yyy ...]
} # end of filter

output {
# Send clean logs to CERT and send the rest to elastic.
  if [@metadata][endpoint] == 'to_cert' {
    pipeline {
      send_to => [CertBuf]
    }
  }else if [@metadata][endpoint] == 'to_elastic' { # else send to elastic 
    pipeline { 
      send_to => [ElasticBuf]
    }
  }
#  stdout { codec => rubydebug }
} # End of output

I have not come to any conclusion yet if we should do it like this, but I don´t think that it would be a very good idea to put all syslog from different vendors to only one index.

What do you think?
Any suggestions on how you would proceed on setting up the logstash to collect syslogs from different vendors? 

Having multiple ports open for the same service (syslog) is not advised. It's better to send all syslog to one port and filter them in a later stage. The system you described is very prone to bad configuration so you should try to keep everything consistent.
I have a system set up where I monitor syslog from Netgear, VCenter and iDRAC. My filtering goes like this

filter {
if [host] == '192.168.0.226' {
	mutate {
		add_tag => ["Netgear"]
	}
}
}

In this condition you could also put the different indexes as you have in your example.

My experience tells me that syslog != syslog; there are different versions of syslog and some equipment is just a bit wierder than others.

Avoid having different ports; not everything can use different ports.

In my situation, I'm ingesting huge rates of logging for various syslog clients. Because I have a cluster of machines I use pacemaker to float various IPs the cluster. For example, I might have the following IPs floating in the machines I refer to as my 'submission tier'. Eg.:

  • one IP for general network devices
  • one IP for a special-case elephant flow network device (eg. firewall, NAT logs)
  • one IP for servers

and as many as you deem necessary and easy enough to support.

I'm actually using rsyslogd to implement the submission tier, encapsulating it with some data such as IP addresses, gets sent to Kafka, and logstash acts as the enrichment tier before sending it to Elasticsearch).

One thing to bear in mind is how you bind a socket with an IP... but allowing that IP to not exist on the machine (yet) because it's currently active on another host in the cluster. In Linux terms, this capability IPFREEBIND. Rsyslogd supports this (for UDP); Logstash does not.

Avoid TCP syslog! (RELP maybe, but not general TCP; if the TCP window closes due to congestion, and then the client socket buffer fills the syslog(3) library function will block (!).... I managed to bring down a network once this way because Bind (DNS) server was blocking on writing syslog because ... logging server (just a regular syslogd at the time) has a full disk.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.