Logstash filter for start and end events repeating

Hi,
I have a log in the below format.. (Not the complete log, only a sample)
2017-05-25T04:22:38.857 INFO - calling : Main.java method :main
2017-05-25T04:22:39.248 INFO - calling : PhiConfigParser.java method :getDocument
2017-05-25T04:22:49.248 INFO - calling : PhiConfigParser.java method :readFilePath
2017-05-25T04:22:50.248 INFO - calling : PhiConfigParser.java method :isConfigPathIncorrect
2017-05-25T04:22:51.648 INFO - Completed processing : PhiConfigParser.java method :isConfigPathIncorrect
2017-05-25T04:23:39.248 INFO - Completed processing : PhiConfigParser.java method :readFilePath
2017-05-25T04:23:49.248 INFO - calling : PhiConfigParser.java method :getDocumentBuilder
2017-05-25T04:24:39.256 INFO - Completed processing : PhiConfigParser.java method :getDocumentBuilder
2017-05-25T04:24:40.248 INFO - calling : PhiConfigParser.java method :readFilePath
2017-05-25T04:25:39.248 INFO - Completed processing : PhiConfigParser.java method :readFilePath
2017-05-25T04:25:44.280 INFO - Completed processing : PhiConfigParser.java method :getDocument
2017-05-25T04:26:01.157 INFO - Completed processing : Main.java method :main

If you observe, a method is called and logged as "calling" and when the method ends it is logged as "Completed processing".
I need to calculate the time taken between calling and completed and ingest that data.

There are 1000's of such methods in the log. Also, a method once completed can start again. For example, readFilePath is called 2 times.

I tried the below combination of elapsed and aggregate filters but it is ingesting all the 12 rows whereas there should have been only 5 rows. Please suggest what is wrong with this and also if there is any alternate method.

filter {
if([type] == "ScanningSessionlogs") {

grok {
	match => ["message", "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:mLogL} - calling : %{WORD:function}.java	 method :%{WORD:event}"]
	add_tag => [ "taskStarted" ]
  }
	
grok {
  match => ["message", "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:mLogL} - Completed processing : %{WORD:function}.java	 method :%{WORD:event}"]
  add_tag => [ "taskTerminated"]
}

elapsed {
	start_tag => "taskStarted"
	end_tag => "taskTerminated"
	unique_id_field => "event"
  }

aggregate {
  task_id => "%{event}"
  code => "map['report'] = (event.get('elapsed_time')).to_i"
  map_action => "create"
}

}

}

Thanks and appreciate your help.

Seems to me that for the elapsed filter your unique_id_field needs to be a concatenation of method, ., event i.e "PhiConfigParser. getDocument" so add a field in the grok filters.

I would also rename the fields from method, event to class, method (the event word in LS is conflated).

I think that you want to drop any events tagged as taskStarted using a conditional block.

I don't think you need the aggregate filter.

Guy,
I did the changes you suggested. It is working but with one problem.
For "elapsed_time" it is considering the current timestamp and not what is present in the log file.
How can i make it calculate the time from the timestamp present in log file?

Here is the modified filter:

filter {
grok {
match => ["message", "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:mLogL} - calling : %{WORD:functionName}.java method :%{WORD:methodName}"]
add_tag => [ "taskStarted" ]
}

grok {
  match => ["message", "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:mLogL} - Completed processing : %{WORD:functionName}.java	 method :%{WORD:methodName}"]
  add_tag => [ "taskTerminated"]
}

elapsed {
	start_tag => "taskStarted"
	end_tag => "taskTerminated"
	unique_id_field => "methodName"
	new_event_on_match => false
  }
  
  if "taskStarted" in [tags] {
	drop { }
  }

}

Consider extracting the timestamp from the log event and using that to update the @timestamp field using the 'date' filter. Do this prior to calling the elapsed filter and the result should then be what you expect.

2 Likes

To further what @Kryten said:

grok {
  # timestamp is not a good name for a date text field, renamed to logdate
  match => ["message", "%{TIMESTAMP_ISO8601: logdate} %{LOGLEVEL:mLogL} - Completed processing : %{WORD:functionName}.java  method :%{WORD:methodName}"]
  # methodName is not unique enough, some code could delegate to the same method name of a different class
  add_field => { "elapsedKey" => "%{functionName}.%{methodName}" }
  add_tag => [ "taskTerminated"]
}

# TODO add _grokparsefailure conditional check

date {
  match => [ "logdate", "ISO8601" ]
}

# TODO add _dateparsefailure conditional check

elapsed {
  start_tag => "taskStarted"
  end_tag => "taskTerminated"
  unique_id_field => "elapsedKey"
  new_event_on_match => false
}

if "taskStarted" in [tags] {
  drop { }
}
1 Like

Thanks @Kryten and @guyboertje, your solution worked for me.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.