Syslog source ip

Hi,
The syslog input plugin is creating a field based on the source IP of the syslog message. However for some hosts this is put into [host] and for other hosts it is going into [host][ip]
I cant figure out why this is not consistent.

    input {
      syslog {
        # -----------------------------------------------
        # port 514 is redirected to 1514 by firewald
        # firewall-cmd --list-all
        # -----------------------------------------------
        port => 1514
        codec => plain
        syslog_field => "message"
        grok_pattern => "<%{POSINT:priority}>%{POSINT:[seq]} %{TIMESTAMP_ISO8601:timestamp} %{HOSTNAME:[_host.name]} %{GREEDYDATA:[syslog_message]}"
        add_field => {
            "[@metadata][es_index]" => "network-logs"
        }
        add_field => {
            "[@metadata][raw_message]" => "%{message}"
        }
        add_field => {
            "[input][type]" => "rsyslog"
        }
      }
    }

Looking at the filter I assume you are also trying to use ECS? I fear there would be anyway a mapping error if one time something in [host] is put and then in [host][ip]

In my experience the syslog-input plugin acts sometimes strange and is a little bit picky - so I switched to the tcp and udp input-plugins and set up the filters mostly on my own to handle all kinds of different syslog-patterns. For that I also use some additional patterns-file:

/etc/logstash/patterns/syslog_patterns

# orientation: match => { "message" => "\<%{NONNEGINT:priority}\>(%{NONNEGINT} | )(%{SYSLOGTIMESTAMP:log_timestamp}|%{TIMESTAMP_ISO8601:log_timestamp}) %{SYSLOGHOST:syslog_host} %{SYSLOGPROG:syslogprog}(: | )%{GREEDYDATA:message}" }
SYSLOG5424PRINTASCII [!-~]+

SYSLOGBASE2 (?:%{SYSLOGTIMESTAMP:log_timestamp}|%{TIMESTAMP_ISO8601:log_timestamp}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:syslog_host}+(?: %{SYSLOGPROG:syslogprog}:|)
SYSLOGPAMSESSION %{SYSLOGBASE} (?=%{GREEDYDATA:message})%{WORD:syslog_pam_module}\(%{DATA:syslog_pam_caller}\): session %{WORD:syslog_pam_session_state} for user %{USERNAME:syslog_username}(?: by %{GREEDYDATA:syslog_pam_by})?

CRON_ACTION [A-Z ]+
CRONLOG %{SYSLOGBASE} \(%{USER:syslog_user}\) %{CRON_ACTION:syslog_action} \(%{DATA:message}\)

SYSLOGPRI <%{NONNEGINT:priority}>

# IETF 5424 syslog(8) format (see http://www.rfc-editor.org/info/rfc5424)
SYSLOG5424SD \[%{DATA}\]+
SYSLOG5424BASE %{SYSLOGPRI}%{NONNEGINT:syslog_ver} +(?:%{TIMESTAMP_ISO8601:syslog5424_ts}|-) +(?:%{IPORHOST:syslog_host}|-) +(-|%{SYSLOG5424PRINTASCII:syslog5424_app}) +(-|%{SYSLOG5424PRINTASCII:syslog5424_proc}) +(-|%{SYSLOG5424PRINTASCII:syslog5424_msgid}) +(?:%{SYSLOG5424SD:syslog5424_sd}|-|)

SYSLOG5424LINE %{SYSLOG5424BASE} +%{GREEDYDATA:syslog5424_msg}

# IETF 3164 syslog format
SYSLOGLINE (?:%{SYSLOGPRI})%{SYSLOGBASE2} %{GREEDYDATA:message}

# unix epoch time
UNIXEPOCH (\d){10}
UNIXEPOCHMS1 ((\d){10}\.(\d){3})
UNIXEPOCHMS2 (\d){13}
SYSLOGBASEUNIX (?:%{UNIXEPOCH:log_timestamp}|%{UNIXEPOCHMS1:log_timestamp}|%{UNIXEPOCHMS2:log_timestamp}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:syslog_host}+(?: %{SYSLOGPROG:syslogprog}:|)
SYSLOGLINEUNIX (?:%{SYSLOGPRI})%{SYSLOGBASEUNIX} %{GREEDYDATA:message}

the logstash pipeline config file looks similar to this (i had to remove/change some stuff before posting - i hope i didn't remove too much)

/etc/logstash/conf.d/filter-syslog.conf

input {
    tcp {
        port => 1514
        # we need to change that some day
        type => syslog
    }

    udp {
        port => 1514
        # we need to change that some day
        type => syslog
        buffer_size => 8192
        codec => plain
    }
}


filter {
    # grok attempt for syslog RFC5424 or RFC3164
    if "_grokparsesuccess" not in [tags] {
        # Manually parse the log, as we want to support both RCF3164 and RFC5424
        grok {
            # load slightly changed default syslog patterns
            patterns_dir => ["/etc/logstash/patterns/"]
            match => { "message" => "%{SYSLOG5424LINE}" }
            add_field => [ "received_from", "%{host}" ]
            add_tag => [ "_grokparsesuccess" ]
            overwrite => [ "message" ]
        }

        if [syslog5424_ts] {
            # Handle RFC5424 formatted Syslog messages
            mutate {
                remove_field => [ "message", "host" ]
                add_tag => [ "syslog5424" ]
            }

            mutate {
                # Use a friendlier naming scheme
                rename => { 
                    "syslog5424_app"  => "syslogprog"
                    "syslog5424_msg"  => "message"
                    "syslog5424_host" => "syslog_host"
                }
                remove_field => [ "syslog5424_ver", "syslog5424_proc" ]
            }

            if [syslog5424_sd] {
                # All structured data needs to be in format [key=value,key=value,...]
                mutate {
                    # split at "][" brackets
                    split => { "syslog5424_sd" => "][" }
                }

                mutate {
                    # Remove any brackets in this array-field
                    gsub => [ "syslog5424_sd", "[\[\]]", "" ]
                }

                mutate {
                    rename => { "syslog5424_sd" => "[log][syslog][syslog5424][sd_data]" }
                }
            }

            date {
                match => [ "syslog5424_ts", "ISO8601" ]
                remove_field => [ "syslog5424_ts", "timestamp" ]
            }
          }
          else {
              # Handle RFC3164 formatted Syslog messages
              grok {
                    # load slightly changed default syslog patterns
                    patterns_dir => ["/etc/logstash/patterns/"]
                    match => { "message" => "%{SYSLOGLINE}" }
                    add_field => [ "received_from", "%{host}" ]
                    add_tag => [ "_grokparsesuccess" ]
                    add_tag => [ "syslog3164" ]
                    overwrite => [ "message" ]
              }
          }
    }


    # grok attempt for syslog messages with epoch-timestamps
    if "_grokparsesuccess" not in [tags] {
        # Manually parse the log, as we want to support both RCF3164 and RFC5424
        grok {
            # load slightly changed default syslog patterns
            patterns_dir => ["/etc/logstash/patterns/"]
            match => { "message" => "%{SYSLOGLINEUNIX}" }
            add_field => [ "received_from", "%{host}" ]
            add_tag => [ "_grokparsesuccess" ]
            add_tag => [ "syslogunixepoch" ]
            overwrite => [ "message" ]
        }
    }

    # last grok attempt for syslog messages in most simple format - e.g. sent via old version of logger command
    # <5>Jul 14 15:20:25 root: some test message
    if "_grokparsesuccess" not in [tags] {
        # Manually parse the log, as we want to support both RCF3164 and RFC5424
        grok {
            match => { "message" => "\<%{NONNEGINT:priority}\>(%{SYSLOGTIMESTAMP:log_timestamp}|%{TIMESTAMP_ISO8601:log_timestamp}) %{SYSLOGPROG}: %{GREEDYDATA:message}" }
            add_field => [ "received_from", "%{host}" ]
            add_tag => [ "_grokparsesuccess" ]
            add_tag => [ "simple_syslog" ]
            overwrite => [ "message" ]
        }
    }

    # this will replace the @timestamp with the timestamp from the event if in correct format
    # check if log_timestamp is in correct format
    date {
        match => [ "log_timestamp", "MMM dd yyyy HH:mm:ss" ]
        add_tag => [ "_dateparsesuccess" ]
        remove_field => [ "log_timestamp" ]
    }

    date {
        match => [ "log_timestamp", "MMM  d yyyy HH:mm:ss" ]
        add_tag => [ "_dateparsesuccess" ]
        remove_field => [ "log_timestamp" ]
    }

    date {
        match => [ "log_timestamp", "ISO8601" ]
        add_tag => [ "_dateparsesuccess" ]
        remove_field => [ "log_timestamp" ]
    }

    date {
        match => [ "log_timestamp", "MMM dd HH:mm:ss" ]
        add_tag => [ "_dateparsesuccess" ]
        remove_field => [ "log_timestamp" ]
    }

    date {
        match => [ "log_timestamp", "MMM  d HH:mm:ss" ]
        add_tag => [ "_dateparsesuccess" ]
        remove_field => [ "log_timestamp" ]
    }

    date {
        match => [ "log_timestamp", "UNIX" ]
        add_tag => [ "_dateparsesuccess" ]
        remove_field => [ "log_timestamp" ]
    }

    date {
        match => [ "log_timestamp", "UNIX_MS" ]
        add_tag => [ "_dateparsesuccess" ]
        remove_field => [ "log_timestamp" ]
    }
    
    # because all grok-filters are taken into account there would be also a '_grokparsefailure'-tag,
    # we don't need it if it was at least correctly filtered by one of the groks
    if ("_grokparsesuccess" in [tags]) {
        # syslog_pri extracts facility and loglevel from the "syslog_pri"ority-field
        syslog_pri { syslog_pri_field_name => "priority" }

        mutate{
            remove_tag => [ "_grokparsefailure" ]
        }
    }
    
    ###########################################################
    ###### Final preps for ECS
    # remove the _dateparsefailure if we find a success-tag
    if ("_dateparsesuccess" in [tags]) {
        mutate {
            remove_tag => [ "_dateparsefailure" ]
        }
    }

    # we have to remove the host field first so we can reuse the field name for ECS style later
    # it seems that we cannot do it in the same mutate-statement!
    if "_grokparsesuccess" in [tags] {
        mutate {
            remove_field => [ "host" ]
        }
       
        # in simple_syslog messages there is no explicit syslog_host :| 
        if "simple_syslog" not in [tags] {
            mutate {
                # we add stuff to arrays
                add_field => { "[host][ip]" => "%{received_from}" }
                add_field => { "[host][name]" => "%{syslog_host}" }
            }
        } else {
            mutate {
                # we add stuff to arrays
                add_field => { "[host][ip]" => "%{received_from}" }
                add_field => { "[host][name]" => "%{received_from}" }
            }
        }
    } else {
        # if no grok matched, we have to get the stuff from the host-field
        mutate {
            add_field => { "received_from" => "%{host}" }
        }
        
        # we have to remove the initial host-field
        mutate {
            remove_field => [ "host" ]
        }
        
        # we have to add this to the host.ip array
        mutate {
            add_field => { "[host][ip]" => "%{received_from}" }
            # as we do not have a hostname because of bad parsing we have to leave the host.name field empty
        }
    }

    # for keeping to ECS
    if [syslog_severity] {
        mutate {
            add_field => { "[log][level]" => "%{syslog_severity}" }
        }
    }
    
    # finally we rename and remove fields
    mutate {
        # we can rename our simple string/text/number-fields ### better - change later https://www.elastic.co/guide/en/ecs/current/ecs-log.html
        rename => {
            "pid"                       => "[process][pid]"
            "program"                   => "[process][name]"
            "syslogprog"                => "[log][logger]"
            "priority"                  => "[log][syslog][priority]"
            "syslog_facility"           => "[log][syslog][facility][name]"
            "syslog_facility_code"      => "[log][syslog][facility][code]"
            "syslog_severity"           => "[log][syslog][severity][name]"
            "syslog_severity_code"      => "[log][syslog][severity][code]"
            "syslog_ver"                => "[log][syslog][version]"
            "received_at"               => "[event][created]"
            "loglevel"                  => "[log][level]"
        }
        
        # we remove unneeded fields with info we already have somewhere else in
        # it's in host.name and host.ip if applicable
        remove_field => [ "syslog_host" ]
        # is event.created
        remove_field => [ "received_at" ]
        # is in host.ip if applicable
        remove_field => [ "received_from" ]
                   
        # we add event.dataset so SIEM part in Kibana looks nice
        add_field => { "[event][dataset]" => "%{type}" }
        add_field => { "[event][type]" => "%{type}" }
       
        # and finally we remove the type as this is duplicated info in event.dataset
        remove_field => [ "type" ]

        # some other ECS best practices
        add_field => { "[ecs][version]" => "1.5.0" }
    }
}

output {
    elasticsearch {...}
}

Those filters should be able to handle following syslog-messages tested via centos and fedora (fedora has a newer logger version then centos):

# logger - version util-linux-2.35.1 has the --sd-id option and --rfc3164, else those commands will be ignored
logger --udp --server 127.0.0.1 --port 1514 --sd-id zoo@123 --sd-param tiger=\"hungry\" --sd-param zebra=\"running\" --sd-id manager@123 --sd-param onMeeting=\"yes\"  "this is a rfc5424 message"
logger --udp --rfc3164 --server 127.0.0.1 --port 1514 "some rfc3164 test message"
logger --udp --server 127.0.0.1 --port 1514 "some rfc5424 test message with newer logger version - older versions have some different format"
echo "<17>$(date +'%b %e %T') 192.168.123.123 service_bibaboo[123]: also some syslog conform message" | nc -u 127.0.0.1 1514
echo "<18>$(date +'%Y-%m-%dT%H:%M:%S.%3N%:z') 192.168.123.123 service_bibaboo[123]: also some syslog conform message with +04:00 timezone" | nc -u 127.0.0.1 1514
echo "<19>$(date +'%Y-%m-%dT%H:%M:%S.%3N') 192.168.123.123 service_bibaboo[123]: also some syslog conform message without timezone info" | nc -u 127.0.0.1 1514
echo "<20>$(date +'%s') 192.168.123.123 service_bibaboo[123]: also some syslog conform message with epochtime" | nc -u 127.0.0.1 1514
echo "<21>$(date +'%s.%3N') 192.168.123.123 service_bibaboo[123]: also some syslog conform message with epochtime with milliseconds" | nc -u 127.0.0.1 1514
echo "<13>1 $(date +'%Y-%m-%dT%H:%M:%S.%3N%:z') amachine administrator - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"103261\"] some rfc5424 test message simulating newer logger version message" | nc -u 127.0.0.1 1514
echo "<13>1 $(date +'%Y-%m-%dT%H:%M:%S.%3N%:z') amachine administrator - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"87912\"][zoo@123 tiger=\"hungry\" zebra=\"running\"][manager@123 onMeeting=\"yes\"] this is a rfc5424 message simulating newer logger version message with additional sd-params and sd-id" | nc -u 127.0.0.1 1514
echo "no parsing with this message" | nc -u 127.0.0.1 1514

There surely are funny looking things in the filters :wink: and can be optimized but currently this filter works well with ELK 7.9 for us. Also it still needs some trimming and so on. maybe it can help you.

I am fixing the issue like this which is working. Just very odd

        if [host][ip] {
            mutate {
                add_field => {
                    "[host][name]" => "%{[_host][name]}"
                }
            }
            mutate {
                remove_field => ["_host"]
            }
        }
        else {
            mutate {
              add_field => {
                "[_host][ip]" => "%{[host]}"
              }
            }
            mutate {
              remove_field => ["host"]
            }
            mutate {
              rename => { "_host" => "host"}
            }
        }