Logstash count events using ruby filter

I have following pipeline (example, not full pipeline) where I need count the number of events for EVERY file I am writing as output. For example, this pipleline creates new file every 30 seconds with the events processed in that 30 seconds. This code is sort of working except two issues:

  1. Writing each trap_id as an entry in out put file (I just want ONE entry with final count) -- Though I am doing just event count, my end goal is, count different log_level events, for example, how many "Error", "Information" windows events etc.,
  2. When I reset the trap_id counter, the first entry in output file starting with ZERO instead of ONE.

Can someone please advise how I can address these issues?

Posted same question on SO too.

input {
  beats {
    port => 5045
  }
}
filter
{
    ruby {
        init => '
                @trap_id = 0
                
                @lasttimestmp = 0
                
        '
        code => '
            
            evnttme = event.get("[@metadata][ms]")
            
            if @lasttimestmp == evnttme
                    @trap_id += 1
                    
                    event.set("lsttimestmp", @lasttimestmp)
                    event.set("trap_id", @trap_id)
                
                    
            else
                    @trap_id = 0
                                        
                    @lasttimestmp = evnttme
                    event.set("lsttimestmp", evnttme)
                    
                    event.set("[@metadata][ms]",  evnttme)
                    
                    event.set("trap_id", @trap_id)
                    
                
            end
    
        '
    }

}

output {

   file {
      path => "output.log"
   }    
   
   file {
            flush_interval => 30
            codec => line { format => "%{[@metadata][ts]}, %{[trap_id]}"}
            path => "C:/lgstshop/local/csv/output%{[@metadata][ms]}.csv"
        }
}

If I had to do this I would use a ruby script file. Keep track of the current 30 second window and count events of each type. If it is time to change files, then return an array of two events, one with the old file name, which includes the counts, and one with the new file name. Otherwise, just return an array of one event, being the current event.

1 Like

Thank you! Is there any difference between with-in filter (or) separate file? I tried exactly what you suggested but with-in filter (like the example posted in question) by defining @trap_id instance variable to keep track of the count. The problem is, output file having entries like below (instead of one single entry with total 1647918042696, 3)

1647918042696, 0, 
1647918042696, 1, 
1647918042696, 2, 
1647918042696, 3, 

I can't do event.cancel, because I need original event written to another output file at same time.

Here is my complete pipeline to give you an idea (I have zero experience with Logstash/Ruby, trying my best to put together based on inputs from you and other members, thank you very much for help!):

I need the data in "file" output one line with "counts" for different log_level, while "CSV" contains the real events for that duration.

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.


input {
  beats {
    port => 5045
  }
}
filter {
   ruby {
		
		init => "
            begin
				path = 'C:/lgstshop/local/'
				@ctime = Time.now.to_f
				id = (@ctime * 1000).to_i/30
				@evt_file = 'output'+id.to_s+'.evt'
                csv_headers = ['Timestamp','Windows Event Timestamp','Log Level','Event ID']
                if File.zero?(@evt_file) || !File.exist?(@evt_file)
                    CSV.open(path+@evt_file, 'w') do |csv|
                        csv << csv_headers
						csv << ['0','0','Error','0']
						csv << ['1','1','Error','1']
						csv << ['2','2','Information','2']
						csv << ['3','3','Error','3']
						csv << ['4','4','Error','4']
                    end
					
                end
				
				path2 = 'C:/lgstshop/local/csv/'
				@csv_file = 'output'+id.to_s+'.csv'
                csv2_headers = ['Timestamp','Info_Sum','Warn_Sum','Error_Sum']
                if File.zero?(@csv_file) || !File.exist?(@csv_file)
                    CSV.open(path2+@csv_file, 'w') do |csv2|
                        csv2 << csv2_headers
						csv2 << ['0','0','0','0']
						csv2 << ['1','1','1','1']
						csv2 << ['2','2','2','2']
						csv2 << ['3','3','3','3']
						csv2 << ['4','4','4','4']
                    end
					
                end
            end
        "
		
		code => '
				@ctime = Time.now.to_f
				event.set("[@metadata][ts]",  (@ctime * 1000).to_i)
				event.set("[@metadata][ms]",  (@ctime * 1000).to_i/30)
        '
    }
}

filter{
 ruby {
        code => '
	    	event.remove("tags")
			event.remove("@version")
			event.remove("@timestamp")
	
			event.get("event").each { |k, v|
                event.set(k,v)
            }
			
			
			event.get("log").each { |k, v|
                event.set(k,v)
            }
			event.remove("log")
         	event.remove("message")
	    '
    }
}

filter
{
	ruby {
		init => '
				@trap_id = 0
				
				@info_cnt = 0
				@warn_cnt=0
				@error_cnt = 0
				
				@lasttimestmp = 0
				
			
		'
		code => '
			
			evnttme = event.get("[@metadata][ms]")
			loglevel = event.get("level")
			
			
			if loglevel == "information"
				@info_cnt += 1
			end
			
			if loglevel == "warn"
				@warn_cnt += 1
			end
			
			if loglevel == "error"
				@error_cnt += 1
			end
			
			
			if @lasttimestmp == 0
				@lasttimestmp = evnttme
			end
			
			
			
			if @lasttimestmp == evnttme
					@trap_id += 1
					
					event.set("lsttimestmp", @lasttimestmp)
					
					event.set("trap_id", @trap_id)
					event.set("info_cnt", @info_cnt)
					event.set("warn_cnt", @warn_cnt)
					event.set("error_cnt", @error_cnt)
					
			else
					@trap_id = 0
					
					@info_cnt = 0
					@warn_cnt = 0
					@error_cnt = 0
					
					@lasttimestmp = evnttme
					
					event.set("lsttimestmp", evnttme)
					
					event.set("[@metadata][ms]",  evnttme)
					
					event.set("trap_id", @trap_id)
					event.set("info_cnt", @info_cnt)
					event.set("warn_cnt", @warn_cnt)
					event.set("error_cnt", @error_cnt)
				
			end
	
		'
	}

}



filter {
    mutate {
        add_field => { "mycustomts" => "%{[@metadata][ts]}" }
    }
}


output {

   file {
      path => "output.log"
   }	
   
   file {
			flush_interval => 30
			codec => line { format => "%{[@metadata][ts]}, %{[info_cnt]}, %{[warn_cnt]}, %{[error_cnt]}"}
		 	path => "C:/lgstshop/local/csv/output%{[@metadata][ms]}.csv"
		}
		
	csv {
       flush_interval => 30
       fields => [ "mycustomts", "created", "level", "code"]
       path => "C:/lgstshop/local/output%{[@metadata][ms]}.evt"
    }
   
}




A script file returns an array of events (1 or 2 in this case), so that is an easy way to create a second event.

You may be able to use a new_event_block as described in the code option of an aggregate filter but I have not tested it. I do not know if that is specific to aggregate or whether you can use it a ruby filter. @Jenni's post here suggests it can be used in a ruby filter.

1 Like

Thank you @Badger. new_event_block seems interesting option, will try.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.