Syslog with millisecond pattern not matching

I am using the following:

input {
  tcp {
    port => 5000
    type => syslog
  }
}

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:s$
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss.SSS", "MMM dd HH:mm:ss.SSS" ]
    }
    if "_grokparsefailure" in [tags] {
      drop { }
    }
  }
}

output {
  elasticsearch { hosts => ["${ELASTICSEARCH_ENDPOINT}"] }
}

But I am seeing messages failing to insert with:

[2018-05-04T19:22:07,407][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"logstash-2018.05.04", :_type=>"doc", :_routing=>nil}, #<LogStash::Event:0x1d18db5e>], :response=>{"index"=>{"_index"=>"logstash-2018.05.04", "_type"=>"doc", "_id"=>"vnOaLGMBQ97cQt5VuMec", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse [syslog_timestamp]", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Invalid format: \"May 4 19:22:07.321\""}}}}}

The syslog date format looks right:

May  4 19:22:07.321
should be matching
MMM  d HH:mm:ss.SSS

Unless configured otherwise the date filter stores the parsed result in the @timestamp field and your syslog_timestamp field it left as-is. Why not just remove that field?

Our application code (can't be changed) requires that field syslog_timestamp. I think I see that %{SYSLOGTIMESTAMP:syslog_timestamp} is not matching in the grok because that pre-defined value does not include milliseconds.

Any idea how to update that include milliseconds?

The error message indicates that the syslog_timestamp field indeed contains "May 4 19:22:07.321" so I don't think the grok filter is the problem. What does an example event produced by Logstash look like? Use a stdout { codec => rubydebug } output.

syslog_timestamp seems to be as expected that I defining in date => match:

{
                "host" => "ip-172-31-16-159.us-west-2.compute.internal",
    "syslog_timestamp" => "May  4 20:06:25.196",
               "msgid" => "9f23a88d-4fd6-11e8-b234-5d9545c6b53f",
            "@version" => "1",
      "syslog_message" => "[RCPT",
     "syslog_hostname" => "outbound2",
      "syslog_program" => "smtpd",
       "received_from" => "ip-172-31-16-159.us-west-2.compute.internal",
             "message" => "<23>May  4 20:06:25.196 outbound2 smtpd: [9f23a88d-4fd6-11e8-b234-5d9545c6b53f] [RCPT",
                "port" => 28088,
                "type" => "syslog",
         "received_at" => "2018-05-04T20:06:46.765Z",
          "@timestamp" => 2018-05-04T20:06:25.196Z
}
date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss.SSS", "MMM dd HH:mm:ss.SSS" ]
    }

I think the problem is that built in grok match %{SYSLOGTIMESTAMP does not have milliseconds. How can I define a custom grok SYSLOGTIMESTAMP pattern that includes millisecondss?

There is nothing wrong in either grok (syslog_timestamp has the value you want) or date (so does @timestamp). The problem is when it tries to index that syslog_timestamp field. elasticsearch wants to convert syslog_timestamp to a date, but cannot figure out what the format is. As Magnus said, you have already extracted @timestamp from it, so why not remove the field using mutate+remove_field

Hi Badger,

Sorry I am not that familiar with logstash can you give me an example of what my configuration looks like using mutate+remove_field?

input {
  tcp {
    port => 5000
    type => syslog
  }
}

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:s$
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss.SSS", "MMM dd HH:mm:ss.SSS" ]
    }
    if "_grokparsefailure" in [tags] {
      drop { }
    }
  }
}

output {
  elasticsearch { hosts => ["${ELASTICSEARCH_ENDPOINT}"] }
}
mutate { remove_field => [ "syslog_timestamp" ] }

That worked, but it completely removes the field syslog_timestamp which the application code uses.

Also why is received_at before @timestamp? Shouldn't @timestamp be the syslog time which should always be before logstash receives it?

"received_at": [
  "2018-05-04T20:44:00.176Z"
],
"@timestamp": [
  "2018-05-04T20:44:00.372Z"
]

Here @timestamp matches syslog_timestamp. I cannot speak to received_at.

What do you want to have in elasticsearch. Do you want syslog_timestamp to be the string "May 4 20:06:25.196" or to you want it to be a date.

Ok developers said it is ok to remove syslog_timestamp after all. So the only question is received_at the syslog timestamp and @timestamp is the timestamp that Logstash processed the message?

I am seeing @timestamp AFTER received_at always.

You mean BEFORE, right? timestamp is set to the timestamp embedded in the syslog message. received_at is set to the value of @timestamp before you parse syslog_timestamp, which will be when logstash received it, which will always be after syslog sent it.

No, that is what I am saying and why I am confused, it is the opposite.

"received_at": "2018-05-04T21:06:41.062Z",
"@timestamp": "2018-05-04T21:06:41.307Z",

{
  "_index": "logstash-2018.05.04",
  "_type": "doc",
  "_id": "9Un6LGMBrQNK-vOnc8A3",
  "_version": 1,
  "_score": null,
  "_source": {
    "msgid": "0a8edae0-4fdf-11e8-b234-5d9545c6b53f",
    "@timestamp": "2018-05-04T21:06:41.307Z",
    "message": "<22>May  4 21:06:41.307 outbound2.ore.mailhop.org smtpd: [0a8edae0-4fdf-11e8-b234-5d9545c6b53f] [SMTP] [AUTH] AUTH LOGIN",
    "syslog_hostname": "outbound2.ore.mailhop.org",
    "syslog_program": "smtpd",
    "host": "ip-172-31-16-159.us-west-2.compute.internal",
    "received_at": "2018-05-04T21:06:41.062Z",
    "port": 28491,
    "syslog_message": "[SMTP] [AUTH] AUTH LOGIN",
    "@version": "1",
    "type": "syslog"
  },
  "fields": {
    "received_at": [
      "2018-05-04T21:06:41.062Z"
    ],
    "@timestamp": [
      "2018-05-04T21:06:41.307Z"
    ]
  },
  "sort": [
    1525468001307
  ]
}

First challenge... that grok you are using is way too basic to catch the multitude of strange things vendors send via syslog. Unless you have a lot of control over the source data, you will need something more complex.

Second challenge... related to the above, vendors will send a lot of different formats of timestamps. Your data filter will have to account for these. Keep in mind that order matter. The first match wins, so the patterns should be ordered most specific to least specific to avoid matching the wrong pattern.

For a good example of the basic syslog processing that we do, take a look at this...

We have gigabytes of sample data from 100's of different devices and apps, and the methods used in that repo do a pretty good job of handling the couple dozen variations that we have seen.

You may be able to adapt some of the concepts to your needs.

Rob

Robert Cowart (rob@koiossian.com)
www.koiossian.com
True Turnkey SOLUTIONS for the Elastic Stack

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.