Timestamp differs by 6 seconds between Kafka record timestamp and the elasticsearch index timestamp

Hi,

There is a ~6 seconds difference in the @timestamp field sent to elasticsearch through Kafka.

Here are the details:
The JSON object is built in java and is published to Kafka.
The Logstash consumes data from Kafka and outputs it to Elasticsearch
The data is discovered in Kibana and also displayed in some dashboards

I am logging the @timestamp field in the java application. The Value is same in the java application logs and also in Kafka Records. However, when i see the same in elasticsearch index, it is off by few seconds (5 to 6 secs).

The Logstash configuration is:

input {
  kafka {
    bootstrap_servers => ["localhost:9092"]
    topics => ["wifi-data"]
    codec => "json"
  }
}

output {
  elasticsearch {
    ilm_policy => "wifi-data-policy"
    ilm_rollover_alias => "wifi-data"
    hosts => ["localhost:9200"]
    user => "elastic"
    password => "aaaaaaa"
  }
}

Any pointers on what could be wrong and how to correct this.

Thanks,
Strive

Does the event in kafka has a timestamp? What is its name?
Could you share what a typical event in Kafka looks like?

Note: All the services are running in same machine. Its my development machine.

Yes @timestamp field is present in Kafka record. Its the last field.

Sample Kafka Record:

{
   "host":{
      "name":"strive-pc"
   },
   "softwareVersion":"",
   "productClass":" ",
   "manufacturer":" ",
   "modelName":"USP Agent 0.1",
   "hardwareVersion":"",
   "manufacturerOUI":"093010",
   "serialNumber":"RM9840593011",
   "collectionTime":"2020-11-02T14:06:39Z",
   "deviceId":"Device 1 ID",
   "radioId":"Device 1 Radio 1 ID",
   "radioEnabled":false,
   "radioNoise":19,
   "radioUtilization":208,
   "bssBSSID":"macaddress",
   "bssSSID":"BSS SSID",
   "bssEnabled":false,
   "bssTimeStamp":"",
   "staMACAddress":"sta macaddress 1",
   "staTimeStamp":"",
   "staUtilizationReceive":129,
   "staUtilizationTransmit":105,
   "staSignalStrength":0.0,
   "staBytesSent":37048,
   "staBytesReceived":213564,
   "staErrorsSent":30,
   "staErrorsReceived":9,
   "staRetransCount":21,
   "staIPV4Address":"ipv4 address 1",
   "staIPV6Address":"ipv6 address 1",
   "staHostname":"android",
   "@timestamp":"2020-11-02T14:06.39Z"
}

In logstash index it is: "@timestamp" : "2020-11-02T14:06:33.400Z"

Could you share the full document in elasticsearch for the same event?

The document is elasticsearch is:

{
        "_index" : "wifi-data-2020.11.02-000081",
        "_type" : "_doc",
        "_id" : "rDJIiXUBMzhHx7NYA8Ox",
        "_score" : 1.0,
        "_source" : {
          "host" : {
            "name" : "strive-pc"
          },
          "serialNumber" : "RM9840593011",
          "manufacturer" : " ",
          "staBytesSent" : 37048,
          "hardwareVersion" : "",
          "modelName" : "USP Agent 0.1",
          "bssTimeStamp" : "",
          "staErrorsSent" : 30,
          "staErrorsReceived" : 9,
          "staUtilizationTransmit" : 105,
          "softwareVersion" : "",
          "staBytesReceived" : 213564,
          "staMACAddress" : "sta macaddress 1",
          "radioEnabled" : false,
          "staIPV4Address" : "ipv4 address 1",
          "staIPV6Address" : "ipv6 address 1",
          "staTimeStamp" : "",
          "productClass" : " ",
          "radioNoise" : 19,
          "radioUtilization" : 208,
          "staSignalStrength" : 0.0,
          "manufacturerOUI" : "093010",
          "radioId" : "Device 1 Radio 1 ID",
          "collectionTime" : "2020-11-02T14:06:39Z",
          "bssSSID" : "BSS SSID",
          "deviceId" : "Device 1 ID",
          "staRetransCount" : 21,
          "@timestamp" : "2020-11-02T14:06:33.400Z",
          "bssBSSID" : "macaddress",
          "staHostname" : "android",
          "staUtilizationReceive" : 129,
          "bssEnabled" : false,
          "@version" : "1"
        }
 }

My guess is that the kafka input plugin does not care at all about the content and generates its own @timestamp field for the event at the moment it is collected.

I'd probably add a mutate filter to make sure to overwrite the @timestamp value using the collectionTime field.

    filter {
      mutate {
         copy => { "collectionTime" => "@timestamp" }
      }
    }

There will be some performance impact if we add that filter.
Note: I am yet to measure the performance aspects.

I will debug for some more time and then make a decision.

Thanks a ton David.

Cheers

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.