[Logstash] Replace the @timestamp with a rsyslog ISO8691 timestamp from Filebeat log lines

Hi Elastic users,

I'm forwarding logs from host to an ELK's logstash instance via filebeat. I'm using @timestamp as the time for the index pattern for my logs. The original @timestamp from filebeat while processing the log line. I'd like to user the rsyslog timestamp as the index pattern time but still using @timetamp filed. I think I need to overwrite the @timestamp with rsyslog.timestamp. The filebeat dissect extract the ISO8601 timestamp 2020-06-16T14:13:29.808106+00:00 as rsyslog.timestamp field and send to logstash.

2020-06-16T14:13:29.808106+00:00 hostname proxy-server: info 192.150.13.21 192.150.13.201 16/Jun/2020/14/13/29 HEAD /v1/AUTH_test/con/obj HTTP/1.0 204 - checkworker passtoken - - - tx111e7cf6ce7f4b499cef0-005ee8d123 - 0.0557 - - 1592316809.731336117 1592316809.787007093 0 -

I found some discussion around and modified the example to mine as below.

filter {
  if [type] == "beats" {
    grok {
      match => { "rsyslog.timestamp" => "%{TIMESTAMP_ISO8601:ts}" }
    }
    date {
      match => ["ts", "ISO8601"]
      target => "@timestamp"
      #remove_field => [ "ts", "timestamp" ]
    }
  }

However, the result is still using the log ingesting time as the timestamp.

{
  "_index": "logstash-filebeats-2020.06.16",
  "_type": "doc",
  "_id": "8_eCvXIB6flSpCLHFqih",
  "_version": 1,
  "_score": null,
  "_source": {
    "content_length": "-",
    "message": "2020-06-16T09:20:44.936764+00:00 hostname container-server: info 192.150.23.3 - - [16/Jun/2020:09:20:44 +0000] \"DELETE /d157/3806/.shards_AUTH_acc/test-data/obj\" 204 - \"DELETE http://abc.net\" \"object-server 305346\" 0.0008 \"-\" 25892 0",
    "status_int": "204",
    "rsyslog": {
      "timestamp": "2020-06-16T09:20:44.936764+00:00",
      "hostname": "ss0137"
    },
    "@timestamp": "2020-06-16T14:21:57.793Z",
    "additional_info": "-",
    "tags": [
      "beats_input_codec_plain_applied",
      "_grokparsefailure"
    ],
    "log": {
      "file": {}
    },
    "user_agent": "object-server 305346",
    "request_method": "DELETE"
  },
  "fields": {
    "@timestamp": [
      "2020-06-16T14:21:57.793Z"
    ],
    "rsyslog.timestamp": [
      "2020-06-16T09:20:44.936Z"
    ]
  },
  "sort": [
    1592317317793
  ]
}

May I have some idea how to replcae the timestamp with rsyslog.timestamp field?

  • ELK 6.6
  • Filebeat 7.7

Thanks // Hugo

In logstash, when the rsyslog object contains a timestamp field you refer to it as [rsyslog][timestamp]. rsyslog.timestamp would refer to a field with a period in its name.

Hi Badger,

The rsyslog.timestamp is defined by myself in the dissect processor.

processors:
    - dissect:
        tokenizer: "%{rsyslog.timestamp} %{rsyslog.hostname} %{programname}: %{severity} %{swift_logs}"
        field: "message"
        target_prefix: ""

How's right way to assign a field abc.efg to be the @timestamp in logstash?

filebeat and elasticsearch do not use the same syntax to name fields that logstash uses.

Do you mean there's no way to user the ISO8601 date format as the @timestamp?

I have another grok pattern filter and it does the trick. The problem is I don't want logstash to parse the whole message.

filter {
  if [type] == "beats" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:ts} %{SYSLOGHOST:sysloghost} %{SYSLOGPROG:junk}: %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    mutate {
      rename => ["program", "programname"]
    }
    date {
      match => ["ts", "ISO8601", "MMM dd HH:mm:ss", "MMM  d HH:mm:ss"]
      target => "@timestamp"
      remove_field => [ "ts", "timestamp" ]
    }
  }

use a date filter to match [rsyslog][timestamp] with ISO8691 and @timestamp as target

@ptamba Thanks for the suggestion.

No luck still.

Source from filebeat:

2020-06-17T05:17:17.931Z  DEBUG [processors]  processing/processors.go:187  Publish event: {
  "@timestamp": "2020-06-17T05:17:17.930Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "7.7.1"
  },
  "host": {},
  "message": "2020-06-17T04:06:41.945781+00:00 hostname proxy-server: info 10.150.10.130 10.150.10.130 17/Jun/2020/04/06/41 GET /acc/cache/obj HTTP/1.0 206 - Linux%3B%20arch:amd64 - - 352214 - txa61005ee996d1 - 0.0499 - - 1592366801.894201040 1592366801.944087982 - 0.0448179244995",
  "headers": "-",
  "remote_addr": "190.50.1.30",
  "client_ip": "190.50.1.30",
  "rsyslog": {
    "hostname": "hostname",
    "timestamp": "2020-06-17T04:06:41.945781+00:00"
  },
  "log": {
    "file": {}
  },
  "programname": "proxy-server",
  "ttfb": "0.0448179244995",
  "user_agent": "NVIDIA/1.0%20%28Linux%3B%20arch:amd64%3B%20lang:go1.14%29%20Cmd/etl-agent%20Lib/ai.nvda.git.nucleus.src.common.yarofs%20Git/57fffcdf",
  "input": {},
  "request_method": "GET",
  "policy_index": "-",
  "client_etag": "-",
  "ecs": {},
  "protocol": "HTTP/1.0",
  "log_info": "-",
  "severity": "info",
  "request_time": "0.0499"
}

Logstash filter

filter {
      date {
        match => [ "rsyslog.timestamp", "ISO8601" ]
      }
    }

Result JSON in ES.

     "@timestamp": "2020-06-17T05:14:33.569Z",
     "agent": {},
    "bytes_recvd": "-",
    "input": {},
    "client_etag": "-",
    "type": "beats",
    "protocol": "HTTP/1.0",
    "status_int": "206",
    "source": "S3",
    "headers": "-",
        "tags": [
      "beats_input_codec_plain_applied"
    ],
    "start_time": "1592364436.912203074",
    "programname": "proxy-server"
  },
  "fields": {
    "@timestamp": [
      "2020-06-17T05:14:33.569Z"
    ],
    "rsyslog.timestamp": [
      "2020-06-17T03:27:16.934Z"
    ]

as badger pointed out, there’s difference in syntax in logstash and filebeat. you should use [rsyslog][timestamp] in logstash, not rsyslog.timestamp

@Badger @ptamba Thanks for the help. It's solved with your answers.

My apologize to @Badger that I didn't understand the keypoint you mentioned as beginning.

For filebeat's filed as this.

 "rsyslog": {
    "hostname": "hostname",
    "timestamp": "2020-06-17T04:06:41.945781+00:00"

To have the filed as [rsyslog][timestamp] works well.

filter {
      date {
        match => [ "[rsyslog][timestamp]", "ISO8601" ]
      }
    }

Keypoint: The filebeat sends the different format than the logstash.

Thanks // Hugo

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.