So I have an application where there is 2 events generated async for each request.
1st event indicates the downstream system acknowledged the event and the second event indicates if the initial request succeeded or failed.
90% of the time, the 1st and 2nd event happen within 1-2 seconds of each other 10% the 2nd events can come between 1-3 days and some times not, but we will always have "acknowledgements" right away.
An analogy would be sent emails vs read emails. So 90% of emails are read as soon as they are received and 10% within 3 days and some not read at all.
In our app the other day we noticed that the 90% where read about 5 minutes later when usually it takes 2-3 seconds. So to us it means the system aknowledged the delivery of the message, but for what ever reason they only got delivered 5 minutes later then the usual 2-3 seconds.
Elasticsearch's Machine Learning can help you in the following way:
I'd recommend storing the time it took between "aknowlegded" and "delivery" in that last delivery document, for example in a field "deliverytime".
You can then create a single metric job with a detector function of "high_mean(deliverytime)". This can be done using the Single Metric Job wizard in the Kibana UI. Using the advanced job UI, you could also add for example the event id as an influencer, which would allow you to do some root cause analysis and see which events caused the delay.
Ok so when the second event comes in... I need to look up the first event and calculate the time difference and store that with the second event right?
My pipeline now is simple... Raw logs are pushed to kafka from various host machines to single topic. From there I have logstash parse and push into specific indexes. Each log is tagged with its "application type".
So now I'm guessing I need an intermediate topic with only those events and I'm guessing kafka streams or flink or some stream processor with window functions?
You're right, at some point this lookup needs to be done and I'd recommend to do that at ingestion time (e.g. logstash) instead of query time (e.g. ML plugin).
This can be achieved using logstash, for example here's some documetation about your use case: https://www.elastic.co/guide/en/logstash/current/plugins-filters-elasticsearch.html. The described plugin is able to query previous elasticsearch documents, get a field from there (e.g. the timestamp of the first event), then calculates a duration based on both timestamps and adds that as another field to your final document.
ML cannot currently deal with stitching together asynchronous events at the moment, although there are some things that we're working on in the future that could make this possible.
If you only have output logs to work with - then doing this at ingest time with Logstash may be overly tricky and may actually be impossible.
If you have control over the code that your application uses to invoke the generation of these events, then possibly the best approach would be to have that code also integrate with our open-source APM server. Using the APM Server's API (https://www.elastic.co/guide/en/apm/server/current/transaction-api.html) - you can time the transaction between the two events. APM then stores this information (transaction times and error count) into Elasticsearch and you can then use ML to profile and alert upon unusual deviations of this data.
Well I have the Id so I can look it up. I see two conflicting views here... Doesn't stream processing like Kafka streams,Flink, storm or even Logstash pipeline solve this issue?
To my knowledge there's no existing Logstash plug-in that gives this functionality. You may have to create your own custom plugin based on this other thread: Logstash transaction processing
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.