Elasticsearch Experts, need your help to achieve below mention goal.
Goal: Try to find a way to calculate lag between the time, log message was generated at application end (@timestamp field) and the time, it was ingested to Elasticsearch (ingest_time field)?
Current Setup:
-
I am using FluentD to capture the logs and send to Kafka. Then I use Kafka connect (Elasticsearch connector) to send the logs further to Elasticsearch. Since I have a layer of Kafka in between FluentD and Elasticsearch, I want to calculate the lag between the log message generation time and ingestion time.
-
Log message generation time is stored in timestamp field of the log and is done at when the application generates log. PFB how log message looks at Kafka topic end.
{
"message": "ServiceResponse - Throwing non 2xx response",
"log_level": "ERROR",
"thread_id": "http-nio-9033-exec-21",
"trace_id": "86d39fbc237ef7f8",
"user_id": "85355139",
"tag": "feedaggregator-secondary",
"@timestamp": "2022-06-18T23:30:06+0530"
}
- I have created an ingest pipeline to add the ingest_time field to every doc inserted to the Elasticsearch index.
PUT _ingest/pipeline/ingest_time
{
"description": "Add an ingest timestamp",
"processors": [
{
"set": {
"field": "_source.ingest_time",
"value": "{{_ingest.timestamp}}"
}
}]
}
- Once document gets inserted to the index from Kafka using Kafka connect (ES sink connector), this is how my message looks on Kibana in JSON format.
{
"_index": "feedaggregator-secondary-2022-06-18",
"_type": "_doc",
"_id": "feedaggregator-secondary-2022-06-18+2+7521337",
"_version": 1,
"_score": null,
"_source": {
"thread_id": "http-nio-9033-exec-21",
"trace_id": "86d39fbc237ef7f8",
"@timestamp": "2022-06-18T23:30:06+0530",
"ingest_time": "2022-06-18T18:00:09.038032Z",
"user_id": "85355139",
"log_level": "ERROR",
"tag": "feedaggregator-secondary",
"message": "ServiceResponse - Throwing non 2xx response"
},
"fields": {
"@timestamp": [
"2022-06-18T18:00:06.000Z"
]
},
"sort": [
1655574126000
]
}
- Now, I wanted to calculate the difference between @timestamp field and ingest_time field. For this I added a script in the ingest pipeline, which adds a field lag_seconds and sets it value as the difference between ingest_time and @timestamp fields.
PUT _ingest/pipeline/calculate_lag
{
"description": "Add an ingest timestamp and calculate ingest lag",
"processors": [
{
"set": {
"field": "_source.ingest_time",
"value": "{{_ingest.timestamp}}"
}
},
{
"script": {
"lang": "painless",
"source": """
if(ctx.containsKey("ingest_time") && ctx.containsKey("@timestamp")) {
ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['@timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000;
}
"""
}
}
]
}
Error:
But since my ingest_time and @timestamp fields are in different format it gave error DateTimeParseException.
{
"error": {
"root_cause": [
{
"type": "exception",
"reason": "java.lang.IllegalArgumentException: ScriptException[runtime error]; nested: DateTimeParseException[Text '2022-06-18T23:30:06+0530' could not be parsed, unparsed text found at index 22];",
"header": {
"processor_type": "script"
}
}
],
"type": "exception",
"reason": "java.lang.IllegalArgumentException: ScriptException[runtime error]; nested: DateTimeParseException[Text '2022-06-18T23:30:06+0530' could not be parsed, unparsed text found at index 22];",
"caused_by": {
"type": "illegal_argument_exception",
"reason": "ScriptException[runtime error]; nested: DateTimeParseException[Text '2022-06-18T23:30:06+0530' could not be parsed, unparsed text found at index 22];",
"caused_by": {
"type": "script_exception",
"reason": "runtime error",
"script_stack": [
"java.base/java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:2049)",
"java.base/java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1948)",
"java.base/java.time.ZonedDateTime.parse(ZonedDateTime.java:598)",
"java.base/java.time.ZonedDateTime.parse(ZonedDateTime.java:583)",
"ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['@timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000;\n }",
" ^---- HERE"
],
"script": " if(ctx.containsKey(\"ingest_time\") && ctx.containsKey(\"@timestamp\")) {\n ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['@timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000;\n }",
"lang": "painless",
"caused_by": {
"type": "date_time_parse_exception",
"reason": "Text '2022-06-18T23:30:06+0530' could not be parsed, unparsed text found at index 22"
}
}
},
"header": {
"processor_type": "script"
}
},
"status": 500
}
So, need your help to find the lag_seconds, between the @timestamp and ingest_time fields.
Using managed Elasticsearch by AWS (Opensearch) Elasticsearch Version - 7.1