Elastic Search Add Ingest Timestamp and Calculate Lag

Elasticsearch Experts, need your help to achieve below mention goal.

Goal: Try to find a way to calculate lag between the time, log message was generated at application end (@timestamp field) and the time, it was ingested to Elasticsearch (ingest_time field)?

Current Setup:

  1. I am using FluentD to capture the logs and send to Kafka. Then I use Kafka connect (Elasticsearch connector) to send the logs further to Elasticsearch. Since I have a layer of Kafka in between FluentD and Elasticsearch, I want to calculate the lag between the log message generation time and ingestion time.

  2. Log message generation time is stored in timestamp field of the log and is done at when the application generates log. PFB how log message looks at Kafka topic end.

    {
  "message": "ServiceResponse - Throwing non 2xx response",
  "log_level": "ERROR",
  "thread_id": "http-nio-9033-exec-21",
  "trace_id": "86d39fbc237ef7f8",
  "user_id": "85355139",
  "tag": "feedaggregator-secondary",
  "@timestamp": "2022-06-18T23:30:06+0530"
}
  1. I have created an ingest pipeline to add the ingest_time field to every doc inserted to the Elasticsearch index.
    PUT _ingest/pipeline/ingest_time
    {
      "description": "Add an ingest timestamp",
      "processors": [
        {
          "set": {
            "field": "_source.ingest_time",
            "value": "{{_ingest.timestamp}}"
          }
        }]
    }
  1. Once document gets inserted to the index from Kafka using Kafka connect (ES sink connector), this is how my message looks on Kibana in JSON format.
{
  "_index": "feedaggregator-secondary-2022-06-18",
  "_type": "_doc",
  "_id": "feedaggregator-secondary-2022-06-18+2+7521337",
  "_version": 1,
  "_score": null,
  "_source": {
      "thread_id": "http-nio-9033-exec-21",
      "trace_id": "86d39fbc237ef7f8",
      "@timestamp": "2022-06-18T23:30:06+0530",
      "ingest_time": "2022-06-18T18:00:09.038032Z",
      "user_id": "85355139",
      "log_level": "ERROR",
      "tag": "feedaggregator-secondary",
      "message": "ServiceResponse - Throwing non 2xx response"
  },

  "fields": {
        "@timestamp": [
          "2022-06-18T18:00:06.000Z"
    ]
  },

"sort": [
      1655574126000
  ]
}


  1. Now, I wanted to calculate the difference between @timestamp field and ingest_time field. For this I added a script in the ingest pipeline, which adds a field lag_seconds and sets it value as the difference between ingest_time and @timestamp fields.
    PUT _ingest/pipeline/calculate_lag
    {
      "description": "Add an ingest timestamp and calculate ingest lag",
      "processors": [
        {
          "set": {
            "field": "_source.ingest_time",
            "value": "{{_ingest.timestamp}}"
          }
        },
        {
          "script": {
            "lang": "painless",
            "source": """
                if(ctx.containsKey("ingest_time") && ctx.containsKey("@timestamp")) {
                  ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['@timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000;
                }
            """
          }
        }
      ]
    }

Error:
But since my ingest_time and @timestamp fields are in different format it gave error DateTimeParseException.

    {
      "error": {
        "root_cause": [
          {
            "type": "exception",
            "reason": "java.lang.IllegalArgumentException: ScriptException[runtime error]; nested: DateTimeParseException[Text '2022-06-18T23:30:06+0530' could not be parsed, unparsed text found at index 22];",
            "header": {
              "processor_type": "script"
            }
          }
        ],
        "type": "exception",
        "reason": "java.lang.IllegalArgumentException: ScriptException[runtime error]; nested: DateTimeParseException[Text '2022-06-18T23:30:06+0530' could not be parsed, unparsed text found at index 22];",
        "caused_by": {
          "type": "illegal_argument_exception",
          "reason": "ScriptException[runtime error]; nested: DateTimeParseException[Text '2022-06-18T23:30:06+0530' could not be parsed, unparsed text found at index 22];",
          "caused_by": {
            "type": "script_exception",
            "reason": "runtime error",
            "script_stack": [
              "java.base/java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:2049)",
              "java.base/java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1948)",
              "java.base/java.time.ZonedDateTime.parse(ZonedDateTime.java:598)",
              "java.base/java.time.ZonedDateTime.parse(ZonedDateTime.java:583)",
              "ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['@timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000;\n            }",
              "                                                                         ^---- HERE"
            ],
            "script": "            if(ctx.containsKey(\"ingest_time\") && ctx.containsKey(\"@timestamp\")) {\n              ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['@timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000;\n            }",
            "lang": "painless",
            "caused_by": {
              "type": "date_time_parse_exception",
              "reason": "Text '2022-06-18T23:30:06+0530' could not be parsed, unparsed text found at index 22"
            }
          }
        },
        "header": {
          "processor_type": "script"
        }
      },
      "status": 500
    }

So, need your help to find the lag_seconds, between the @timestamp and ingest_time fields.

Using managed Elasticsearch by AWS (Opensearch) Elasticsearch Version - 7.1

OpenSearch/OpenDistro are AWS run products and differ from the original Elasticsearch and Kibana products that Elastic builds and maintains. You may need to contact them directly for further assistance.

(This is an automated response from your friendly Elastic bot. Please report this post if you have any suggestions or concerns :elasticheart: )