Calculate the time difference between consecutive documents

Can I calculate time difference between consecutive document in elasticsearch?
Suppose there are 3 records,the timestamp of first record is 10:45:00 and timestamp of second record is 10:47:00 and timestamp of next record is 10:50:00;the time difference between first and second record is of 2 minutes and time difference between second and third record is of 3 minutes.
It is possible to calculate time difference between consecutive documents?

1 Like

I don't think you can. This is something you typically need to solve at index time.
For example, have a look at the aggregate filter in Logstash.

Thanks for your heads up @dadoonet. Whether It is possible to calculate the time difference using derivative or serial differencing aggregation?

In theory, it should be possible to use a date_histogram to aggregate documents at the (milli-)second level and then use a derivative of the timestamp, but in practice, I think performance would suffer a great deal.

thanx @val..I tried same solution and as you said performance wise it is not good practice.

Indeed, as suggested by @dadoonet and myself, you should capture that information at indexing time.

1 Like

My data is coming from stream and I am directly storing it into elasticsearch index..can logstash aggregate filter work with stream of data?

Hi,
I did the almost same thing using Ruby in my logstash configuration file. I have used ruby map to store timestamp of the first record, and I calculate the difference when second record is received using Time.parse() function of ruby, and then I finally store the difference in the second record.
This will continue but you need to make sure that your logs are read in a sequence and no multi threading etc otherwise you might get the wrong results.

map['timeDifference']= (Time.parse(event.get('logTimestamp')).to_f - Time.parse(map['previousTime']).to_f).round(4);
event.set('timeDifference', map['timeDifference']);
map['previousTime'] = event.get('logTimestamp');
1 Like

thanks @MariumHassan,I am new to logstash so can you send me script?

1 Like

Sorry, I don't have the relevant script available at the moment. You can take help from configuration examples. Modify it according to your need and add the code below in the filter part after the required changes

ruby
{
  init => "@@map = {}"
  code => "map['timeDifference']= (Time.parse(event.get('logTimestamp')).to_f - Time.parse(map['previousTime']).to_f).round(4);
           event.set('timeDifference', map['timeDifference']);
           map['previousTime'] = event.get('logTimestamp');"
}

You might need to update the grok pattern according to your logs.

1 Like

Great job @MariumHassan

1 Like

hey @MariumHassan. Can I take input from elasticsearch index and apply ruby filter on that??

Hi,
You can query data from elasticsearch and process it using any language you prefer. However, updating that data after processing it in this way might be an issue here. I think you will need to update it by query or you can simply get/store document id of the document you want to update and use that to update the document later.
This is helpful: Logstash Update a document in elasticsearch

1 Like

HI @MariumHassan this is my script

input {
elasticsearch {
 hosts => "http://192.168.55.213:9200"
 index => "test_index"
}
}
filter {
aggregate {
task_id => "%{logTimestamp}"
code => "
     map['logTimestamp'] = event.get('logTimestamp');
     map['timeDifference']= (Time.parse(event.get('logTimestamp')).to_f - 
     Time.parse(map['previousTime']).to_f).round(4);
     map['previousTime'] = event.get('logTimestamp');
 
"
}
}
output {
elasticsearch {
document_id => "%{logTimestamp}"
document_type => "test_index1"
index => "test_index1"
codec => "json"
hosts => ["192.168.55.213:9200"]
}
}

When I am trying to execute this script it's giving error as

Aggregate exception occurred {:error=>#<TypeError: can't dup NilClass>, :code=>"\n\t map['logTimestamp'] = event.get('logTimestamp');\n map['timeDifference']= (Time.parse(event.get('logTimestamp')).to_f - Time.parse(map['previousTime']).to_f).round(4);\n\tmap['previousTime'] = event.get('logTimestamp');\n\t \n ", :map=>{"logTimestamp"=>"2018-02-15T08:40:10Z"}, :event_data=>{"logTimestamp"=>"2018-02-15T08:40:10Z", "@timestamp"=>2018-04-25T09:47:38.312Z, "@version"=>"1"}}

Can U suggest what changes I have to done to run script successfully?

Hi @MariumHassan My script is working but I am getting same time in previousTime field and If I do not declare map['previousTime '] at start of the script then it's throwing exception

Hi, you need to apply a check. Something like this:

if map['previousTime'] == nil
   map['previousTime'] = 0;

It's working @MariumHassan...but sometimes it's not giving proper results and when I am adding query in input to store result in ascending order,that query is not working at all (Record is not inserted in ascending order)

This is my updated config file

input {
elasticsearch {
hosts => "http://192.168.55.213:9200"
index => "test_index"
type => "test_index"
query => '{"sort": [{"logTimestamp": {"order": "asc"}}],"query": {"match_all": {}}}'
}
}
filter {
ruby {
init => "@@map = {}"
code => "
@@map['previousTime'] = event.get('logTimestamp') if @@map['previousTime'].nil?
@@map['timeDifference']= (Time.parse(event.get('logTimestamp')).to_f -
Time.parse(@@map['previousTime']).to_f);
event.set('timeDifference',@@map['timeDifference']);
event.set('previousTime',@@map['previousTime'])
@@map['previousTime']=event.get('logTimestamp');
"
}
mutate {
remove_field => ["@version","@timestamp"]
}

}
output {
elasticsearch {
document_type => "test_index3"
index => "test_index3"
codec => "json"
hosts => ["192.168.55.213:9200"]
}
}

Hi, I tried this in my kibana console:

GET _search
{
  "sort" : [
       {"logTimestamp": {"order": "asc"}}
       ],
  "query": {
    "match_all": {}
  }
}

It works fine for me. You get sorted input from elasticsearch and calculate the time difference between two consecutive documents. Now, you should add that time difference field in the current document by updating the existing one instead of inserting it again. Please check: https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-action

It is asking for document_id to update document.What I have to provide in document_id?

i tried with multiple options like
document_id => "%{Doc_id}"
document_id =>[@metadata][_id]
document_id =>"%{uid}"

but it's not working.I want to override same record with added field time_difference