How to implement task life cycle scenario

hello, I am new to ELK stack and I want to create one scenario where I can track elapsed time. e.g I have a platform where user can CREATE ticket. and that ticket will be assigned to some department/person. then it will move forward till it gets CLOSED by user/system.
Till now I have created sample log as follows

{"log_time":"27/01/2018 00:00:00.000","user_id":789,"session_id":17878,"event":"CREATED","action":"","job_id":1,"msg":""}}
{"log_time":"27/01/2018 01:00:00.000","user_id":569,"session_id":54578,"event":"RANDOM","action":"","job_id":2,"msg":""}}
{"log_time":"27/01/2018 02:00:00.000","user_id":569,"session_id":54578,"event":"RANDOM","action":"","job_id":2,"msg":""}}
{"log_time":"29/01/2018 00:00:00.000","user_id":789,"session_id":87896,"event":"CLOSED","action":"","job_id":1,"msg":""}}

job_id can be ticket/task/project ID

in above logs I need information

  • elapsed time between event CREATED and CLOSED

how can I achieve this, anything would be helpful. thank you

Use an aggregate filter. Look at example 1 in the documentation.

i don't think that example is the solution. because events will be random like two consecutive create event can come by different users. please see below logs. and importantly CREATE and CLOSED events can be month apart from each other.

{"log_time":"27/01/2018 00:00:00.000","user_id":789,"session_id":17878,"event":"CREATED","action":"","job_id":1,"msg":""}
{"log_time":"27/01/2018 00:00:00.000","user_id":789,"session_id":89894,"event":"CREATED","action":"","job_id":5,"msg":""}
{"log_time":"27/01/2018 01:00:00.000","user_id":569,"session_id":54578,"event":"RANDOM","action":"","job_id":2,"msg":""}
{"log_time":"27/01/2018 02:00:00.000","user_id":569,"session_id":54578,"event":"RANDOM","action":"","job_id":2,"msg":""}
{"log_time":"28/01/2018 00:00:00.000","user_id":789,"session_id":56565,"event":"CLOSED","action":"","job_id":5,"msg":""}
{"log_time":"29/01/2018 00:00:00.000","user_id":789,"session_id":87896,"event":"CLOSED","action":"","job_id":1,"msg":""}

I do not see why any of that would prevent you using an aggregate filter to solve the problem using example 1 as a guide.

sorry I was not clear before, let me explain this scenario in detail. suppose I have back-end application which is generating logs 100 records per second. and logs are consists of system log and application log combine in generic format.

{"log_time":"01/01/2018 00:00:00.000","user_id":111,"session_id":17878,"event":"CREATED","action":"","job_id":1,"msg":"create new job ===> id = 1"}
{"log_time":"01/01/2018 01:00:00.000","user_id":569,"session_id":54578,"event":"RANDOM","action":"","job_id":0,"msg":""}
{"log_time":"01/01/2018 02:00:00.000","user_id":569,"session_id":54578,"event":"RANDOM","action":"","job_id":0,"msg":""}
{"log_time":"30/01/2018 02:00:00.000","user_id":979,"session_id":78899,"event":"RANDOM","action":"","job_id":0,"msg":""}
{"log_time":"31/01/2018 00:00:00.000","user_id":111,"session_id":87896,"event":"CLOSED","action":"","job_id":1,"msg":"job id = 1 completed"}

In above logs user with user_id = 111 CREATED new job/ticket/record (job_id=1) in application on 01/01/2018. and on 31/01/2018 user CLOSED that job/ticket/record (job_id=1).
now there are 100 * 60 * 60 * 24 records are getting generated per day. and I want information about individual job on the fly e.g. job_id=1

  • number of job/ticket/record got CREATED/CLOSED in 1 day/month/year
  • elapsed time between two events (job_id=1 and event=CREATED/CLOSED)
  • average time spent on job/ticket/record by 1 person (user_id = 111)

Is this possible to implement in logstash ? or is there any other way to achieve this?
and @Badger if you still think this is possible using logstash aggregation example 1, then could you explain in detail a little bit. because I am not able to figure it out.

I have other solution which achieves all of the above scenarios, but I have to update back-end application for that. when I receive CLOSED event I will add CREATE event time (start_time) in same log and I just need to add 1 script_field in kibana as follows.

log: -

{"log_time":"31/01/2018 00:00:00.000","user_id":111,"session_id":87896,"event":"CLOSED","action":"","job_id":1,"msg":"job id = 1 completed", "start_time":"01/01/2018 00:00:00.000"}

script_field: -

if(doc['start_time'].size()!=0) { 
    return (doc['log_time'].value.millis - doc['start_time'].value.millis) / 1000 / 60 ; 
} else { 
    return null; 

In below image i am calculating elapsed time (minutes) in between LOGGEDIN and LOGGEDOUT events using script_field feature in kibana. (time_span column)

If you want to find the time for which a ticket was open you could use

    json { source => "message" remove_field => [ "message" ] }
    date { match => [ "log_time", "dd/MM/YYYY HH:mm:ss.SSS" ] }
    if [event] == "CREATED" {
        aggregate {
            task_id => "%{job_id}"
            code => 'map["start_time"] = event.get("@timestamp")'
            map_action => "create"

    if [event] == "CLOSED" {
        aggregate {
            task_id => "%{job_id}"
            code => 'event.set("duration", event.get("@timestamp") - map["start_time"])'
            map_action => "update"
            end_of_task => true
            timeout => 10

If your concern is that tickets can be open for a very long time then you could use an approach like this.

The number of tickets created/closed in a time period and the average time a ticket was open are questions I would answer using elasticsearch, not logstash.

1 Like

@Badger thank you for explaining, this helps. I have one more question what max value I can put in "timeout => 10". can it be 3 days.
timeout => 24 * 3 * 60.
and what if i don't add timeout at all?

Have a look at Transform, it can be used to create session information out of your stream of events. Given that session information, you can query for things like average time a ticket was open.

1 Like

Transforming data seems to be a better solution in my case, I will look into it. and I have found another way to achieve this with Elapsed filter plugin. only issue is if two logs comes close to each other e.g. CREATE and CLOSE then it does not work. because logstash makes query to elasticesearch before the previous record gets processed.
I have three ways to achieve this now, need to find out which one is better for me.
@Badger and @Hendrik_Muhs thank you for your help

PS : "and what if i don't add timeout at all?" was a dumb question there is a default value 30 minutes :slight_smile:
but still what max value I can put there ?

I am not aware of an upper limit of what you can set timeout to.

If you do not set it at all then the filter uses a default of 1800 seconds.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.