Debugging a filter - optimizing for speed. How to measure speed of a plugin?

Hi all,

To be able to monitor how fast or slow my logstash filter configurations are, I generally take a tick as first filter and I take a tick on last filter and calculate the time difference between the ticks. Ticks are created by small ruby code snippet.

Now I've built a new filter and it's processing speed is really poor. It says about 1 second per event, which is not acceptable.

Now I want to know which part of my filter is making it slow.
Sure one way would be adding more measurement points using my pattern as before.

But in filter logstash-plugins I see following general option: enable_metrics => true.
I assume this metric is designed for checking how fast filter plugins are running. Is that correct?

If so, how can I use them? How can I access the gathered data?

Using basic xpack.

Thanks, Andreas

I moved this over to logstash, you will more likely get an answer here!

ok, some update from my side, I get along a bit further, but new questions are coming up.

I have following combined filters: (cat * in filter directory)

input 
{
  beats 
  {
    port => 5044
  }
}filter
{
	#
	# logsatash filter processing times part one: start time
	
	ruby
	{
		id => "logstash_processing_start_time_ruby"
		init => 
		"
			require 'time'
		"
		code =>
		"
			event.set('[logstash][processing][filterStart]',myTime = DateTime.now.strftime('%Q').to_i)
		"
	}

}filter
{

	if [logType] == "rpcrouter" or [type] == "rpcrouter"
	{
		grok
		{
			id => "rpcrouter_grok_main"
			match => ['message','^%{FAST_TIMESTAMP:logTime} \[[^\]]+\] \w+ %{NOT_SPACE:javaClass} +- +%{GREEDYDATA:payload}']
			patterns_dir => ["C:\elk\logstash\conf_dev\patterns"]
		}
		
		# parse date
		date
		{
			id => "rpcrouter_date"
			match => [ "logTime", "yyyy-MM-dd HH:mm:ss,SSS" ]
			timezone => "Europe/Berlin"
		}
		
		if [javaClass] == "com.gedas.logiweb.api_internal.main.SessionHandler"
		{
			drop 
			{
				id => "rpcrouter_drop"
			}
		}
		

		# parse the multiline event
		ruby
		{
			id => "rpcrouter_ruby_multiline"
			init => 
			"
				require 'time'
			"
					
			code => '
				
timeStart=DateTime.now.strftime("%Q").to_i

a=event.get("message")

stackTrace=""
exception=""

# split to lines
lineNumber=0
b=a.split("\n")

userName="null"
ip="null"
apiMethod=""
if b.length > 1
    b.each{  |x| 
        if lineNumber == 1
            delimiterPos = x.index(":") and x.start_with? "Remote user"
            userName=x[delimiterPos+2..-1]
        elsif lineNumber==2 and x.start_with? "Remote address"
            delimiterPos = x.index(":")
            event.set("ip", x[delimiterPos+2..-1])
        elsif lineNumber==3 and x.start_with? "Method cal"
            delimiterPos = x.index(":")
            apiMethod= x[delimiterPos+2..-1]
            event.set("apiMethod",apiMethod)
        elsif lineNumber==6 and apiMethod=="login"
            userName=x
        elsif lineNumber==8 and apiMethod=="login"
            if x.start_with? "//"
                event.set("apiVersion",x[2..-1])
            else
                event.set("apiVersion",x)
            end 
        elsif x.start_with? "[" 
            if x.index("Exception")
                exception=x # store for later use
                # split the line by ;
                details=x.split(";")
                #puts details
                details.each{ |d|
                    if d.start_with? " msg="
                       event.set("exceptionMsg", d[5..-1])
                    elsif d.start_with? " targetException"
                        delimiterPos = d.index("=")
                        event.set("exceptionTarget", d[delimiterPos+1..-1])
                    end
                }
            end
        elsif x.start_with? "\tat"
            if stackTrace == ""
                stackTrace=exception + "\n" + x + "\n"
            else
                stackTrace=stackTrace + x + "\n"
            end   
        end 
        lineNumber=lineNumber+1
    
    }
    event.set("userName", userName)
    event.set("stackTrace", stackTrace)

end

timeEnd=DateTime.now.strftime("%Q").to_i
timeDiff=(timeEnd-timeStart)
event.set("[logstash][debug][rubyTime]", timeDiff)


			'
		}
		
		if "_grokparsefailure" not in [tags]
		{
			# build new message string
			mutate
			{
				id => "rpcrouter_mutate_replace"
				replace => { "message" => "%{userName} - %{apiMethod} - %{exceptionMsg}" }
			}
		}
		
		if "_dateparsefailure" not in [tags]
		{
			mutate
			{
				id => "rpcrouter_mutate_remove_date"
				remove_field => ['logTime']
			}
		}
	}
}filter
{
	#
	# logsatash filter processing times part two: end time

	ruby
	{
		id => "logstash_processing_end_time_ruby"
		init => 
		"
			require 'time'
		"
		code => 
		"
			currentTimeInt=DateTime.now.strftime('%Q').to_i
			event.set('[logstash][processing][filterEnd]', currentTimeInt)
			event.set('[logstash][processing][filterTime]',(currentTimeInt - event.get('[logstash][processing][filterStart]')))
		"
	}
	
	# parse additional timestamps to date
	date
	{
		id => "logstash_processing_end_time_date_1"
		match => [ '[logstash][processing][filterStart]', 'UNIX_MS' ]
		target => '[logstash][processing][filterStart]'
		timezone => "Europe/Berlin"
	}
	
	date
	{
		id => "logstash_processing_end_time_date_2"
		match => [ '[logstash][processing][filterEnd]', 'UNIX_MS' ]
		target => '[logstash][processing][filterEnd]'
		timezone => "Europe/Berlin"
	}

}filter
{
	mutate
	{
		 id => "delete_beats_tag_mutate"
		 remove_tag => [ "beats_input_codec_plain_applied" ]
	}
}output 
{
	elasticsearch 
	{ 
		id => "elasticsearch_output_main"
		hosts => ["localhost:9200"] 
		index => "%{[logType]}-%{+YYYY.MM.dd}"
	}
	
}

So my own statistic is saved as logstash.processing.filterTime and logstash.debug.rubyTime

So the avg time of of logstash.processing.filterTime looks really odd to me. So I have some regrets putting this filter to production.

Since the opening of this topic I think I understand now how to use the metrics logstash is writing. I added id flag to each filter plugin and I found the gathered data in the moinitoring index of logstash:

{
  "id": "main",
  "reloads": {
    "failures": 0,
    "successes": 0
  },
  "ephemeral_id": "417d07f4-6144-4697-8f67-00e08a1d25de",
  "hash": "06b99a6f6c1f469652fd3b3a68a4c58ecb820d1b050ece27b8c8b91ec0db2068",
  "queue": {
    "events_count": 0,
    "type": "memory"
  },
  "vertices": [
    {
      "id": "ea6766da744c7c1136356c78f9d62b8b6721003996f0d0e8176efff59b74641f",
      "queue_push_duration_in_millis": 973826,
      "pipeline_ephemeral_id": "417d07f4-6144-4697-8f67-00e08a1d25de",
      "events_out": 1383424
    },
    {
      "id": "rpcrouter_ruby_multiline",
      "duration_in_millis": 1521,
      "pipeline_ephemeral_id": "417d07f4-6144-4697-8f67-00e08a1d25de",
      "events_out": 112279,
      "events_in": 1383424
    },
    {
      "id": "rpcrouter_drop",
      "duration_in_millis": 7220170,
      "pipeline_ephemeral_id": "417d07f4-6144-4697-8f67-00e08a1d25de",
      "events_out": 0,
      "events_in": 1271145
    },
    {
      "id": "rpcrouter_mutate_remove_date",
      "duration_in_millis": 48,
      "pipeline_ephemeral_id": "417d07f4-6144-4697-8f67-00e08a1d25de",
      "events_out": 112279,
      "events_in": 112279
    },
    {
      "id": "delete_beats_tag_mutate",
      "duration_in_millis": 55,
      "pipeline_ephemeral_id": "417d07f4-6144-4697-8f67-00e08a1d25de",
      "events_out": 112279,
      "events_in": 112279
    },
    {
      "id": "logstash_processing_end_time_date_1",
      "duration_in_millis": 92,
      "events_out": 112279,
      "long_counters": [
        {
          "name": "matches",
          "value": 112279
        }
      ],
      "pipeline_ephemeral_id": "417d07f4-6144-4697-8f67-00e08a1d25de",
      "events_in": 112279
    },
    {
      "id": "rpcrouter_mutate_replace",
      "duration_in_millis": 203,
      "pipeline_ephemeral_id": "417d07f4-6144-4697-8f67-00e08a1d25de",
      "events_out": 112277,
      "events_in": 112277
    },
    {
      "id": "rpcrouter_date",
      "duration_in_millis": 1123,
      "events_out": 1383424,
      "long_counters": [
        {
          "name": "matches",
          "value": 1383422
        }
      ],
      "pipeline_ephemeral_id": "417d07f4-6144-4697-8f67-00e08a1d25de",
      "events_in": 1383424
    },
    {
      "id": "logstash_processing_start_time_ruby",
      "duration_in_millis": 53938,
      "pipeline_ephemeral_id": "417d07f4-6144-4697-8f67-00e08a1d25de",
      "events_out": 1383424,
      "events_in": 1383424
    },
    {
      "id": "logstash_processing_end_time_date_2",
      "duration_in_millis": 63,
      "events_out": 112279,
      "long_counters": [
        {
          "name": "matches",
          "value": 112279
        }
      ],
      "pipeline_ephemeral_id": "417d07f4-6144-4697-8f67-00e08a1d25de",
      "events_in": 112279
    },
    {
      "id": "rpcrouter_grok_main",
      "duration_in_millis": 2971,
      "events_out": 1383424,
      "long_counters": [
        {
          "name": "matches",
          "value": 1383422
        },
        {
          "name": "failures",
          "value": 2
        }
      ],
      "pipeline_ephemeral_id": "417d07f4-6144-4697-8f67-00e08a1d25de",
      "events_in": 1383424
    },
    {
      "id": "logstash_processing_end_time_ruby",
      "duration_in_millis": 1484,
      "pipeline_ephemeral_id": "417d07f4-6144-4697-8f67-00e08a1d25de",
      "events_out": 112279,
      "events_in": 112279
    },
    {
      "id": "elasticsearch_output_main",
      "duration_in_millis": 319680,
      "pipeline_ephemeral_id": "417d07f4-6144-4697-8f67-00e08a1d25de",
      "events_out": 112279,
      "events_in": 112279
    }
  ],
  "events": {
    "duration_in_millis": 8522978,
    "queue_push_duration_in_millis": 973826,
    "out": 1383424,
    "in": 1383424,
    "filtered": 1383424
  }
}

Here are my new questions:

  1. I assumed to calculate the avg processing time per filter plugin call to be: duration_in_millis / events_in. Also I assumed that the sum of avg processing time per filter plugin call is something near my measured logstas.processing.filterTime. But my filterTime is much higher.

  2. So where is the rest of my filterTime taken?

  3. Why is does the drop filter (id=rpcrouter_drop) needs so long? Calculating duration_in_millis / events_in makes 5ms per drop. Feels incredibly long for me.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.