How to use user-defined plugin

logstash.conf:

input {
                file {
                                type => "_doc"
                                path => "/data/mysql_*_log/slow.log"
                                codec => multiline {
                                        pattern => "^# User@Host:"
                                                negate => true
                                                what => "previous"
                                                max_bytes => "100kib"
                                                auto_flush_interval => 5
                                }
                }
}

filter {
        if [type] == "_doc" {
                  if ("multiline_codec_max_bytes_reached" in [tags]) {
                        drop {}
                  }

                  grok {
# User@Host: logstash[logstash] @ localhost [127.0.0.1]
# User@Host: logstash[logstash] @  [127.0.0.1]
#patterns_dir => ["./patterns"]
                          match => [ "message", "^# User@Host: (?<user_ignore>[\s\S]*)\[(?<user>[\s\S]*)\] @ (?<dbhost>[\s\S]*)? \[%{IP:clientip}\](\s+Id:\s+%{NUMBER:thread_id:int})?" ]
#match => [ "message", "^# User@Host: %{ALLWORD:user}\[%{ALLWAORD}\] @ %{ALLWORD:dbhost}? \[%{IP:clientip}\]" ]
                  }
                  grok {
# Query_time: 102.413328  Lock_time: 0.000167 Rows_sent: 0  Rows_examined: 1970
                          match => [ "message", "^# Query_time: %{NUMBER:Query_time:float}%{SPACE}Lock_time: %{NUMBER:lock_wait:float}%{SPACE}Rows_sent: %{NUMBER:Rows_sent:int}%{SPACE}Rows_examined:%{SPACE}%{NUMBER:Rows_examined:int}"]
                  }

# Capture the time the query happened
                  grok {
                          match => [ "message", "^SET timestamp=%{NUMBER:time};%{SPACE}(?<sql>[\s\S]*)" ]
                  }

                  if "_grokparsefailure" in [tags] { drop {} }
                  if [sql] =~ "^call p_status.*" { drop{} }
                  mutate {
                        convert => { "time" => "integer" }
                  }

                  date {
                        match => ["time", "UNIX"]
                        remove_field => ["host", "message","user_ignore", "tags"]
                add_field => {"engine_type" => "mysqlslowlog" }
                  }
        }
    dbms_business {}
}

output {
  }
  stdout { codec =>rubydebug }
}

dbms_business.rb:

# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "mysql2"

# This example filter will replace the contents of the default
# message field with whatever you specify in the configuration.
#
# It is only intended to be used as an example.
class LogStash::Filters::Dbms_business < LogStash::Filters::Base

   # Setting the config_name here is required. This is how you
   # configure this filter from your Logstash config.
   #
   # filter {
   #   example {
   #     message => "My message..."
   #   }
   # }
   #
   config_name "dbms_business"

   # Replace the message with this value.
   config :message, :validate => :string, :default => "Hello World!"




   public
   def register
      # Add instance variables
      @last_ip_time = 0
      @last_ip = ''
      @port_to_businesses=Hash.new()
      @valid_time = 86400
   end # def register

   public
   def filter(event)
      return unless filter?(event)
      ts = Time.now.to_i
      if event.get("engine_type") == "mongodbslowlog"
         business,port,groupname,productname,developer,operator,developer_dw,operator_dw,test,model = get_mongodb_business(ts, event.get("path"))

         if ( business == "" || business == nil )
            event.cancel()
         end
         event.set("business_name",business)
         event.set("ip",get_ip_address(ts))
         event.set("port",port)
         event.set("group_name",groupname)
         event.set("product_name",productname)
         event.set("developer",developer)
         event.set("operator",operator)
         event.set("developer_dw",developer_dw)
         event.set("operator_dw",operator_dw)
         event.set("test",test)
         event.set("buisness_model",model)
         event.set("time",event.get("@timestamp").to_i * 1000)
      elsif event.get("engine_type") == "mysqlslowlog"
         p,ignore = event.get("path").split(File::SEPARATOR).last(2)
         port = p.split('_')[1].to_i

         ip = get_ip_address(ts)
         business,groupname,productname,developer,operator,developer_dw,operator_dw,test,model,password = get_mysql_business(ts, port)
         event.set("business_name",business)
         event.set("ip",get_ip_address(ts))
         event.set("port",port)
         event.set("group_name",groupname)
         event.set("product_name",productname)
         event.set("developer",developer)
         event.set("operator",operator)
         event.set("developer_dw",developer_dw)
         event.set("operator_dw",operator_dw)
         event.set("test",test)
         event.set("buisness_model",model)
         event.set("time",event.get("time") * 1000)
		 thread_id=event.get("thread_id")
		 if (thread_id == "" || thread_id == nil)
		    event.set("dbname","")
		 else 
		    event.set("dbname",get_db_from_thread(port,password,thread_id))
		 end
      end
      # correct debugging log statement for reference
      # using the event.get API
      @logger.debug? && @logger.debug("Message is now: #{event.get("message")}")
      # filter_matched should go in the last line of our successful code
      filter_matched(event)
   end # def filter



   def get_conf_value_from_file(file,key)
      configfile=File.new(file, 'r')
      while ( line = configfile.gets)
         vars = line.chomp.split('=')
         if ( vars.first == key )
            vvv = vars.last
            break
         end
      end
      configfile.close
      return vvv
   rescue => err
      return nil
   end


   ......
   
   def get_db_from_thread(port,password,thread_id)
        client = Mysql2::Client.new(
			:host     => '127.0.0.1', 
			:port     => port,
			:username => 'db_monitor', 
			:password => password,
			:database => 'mysql',
			:encoding => 'utf8'
	    )
		 
		result = client.query("select db from information_schema.processlist where id=#{thread_id};")
		result.each do |item|
		   if item["db"]
		      return item["db"]
		   else
              return ""		   
           end
		end   
   end 
   


end # class LogStash::Filters::Dbms_business


recently,i add block code "get_db_from_thread" function and require "mysql2" ,then it does not work now. but before the added code,it worked well.

the reported error:

[2023-03-22T11:02:10,137][ERROR][logstash.agent           ] Cannot create pipeline {:reason=>"Couldn't find any filter plugin named 'dbms_business'. Are you sure this is correct? Trying to load the dbms_business filter plugin resulted in this error: Problems loading the requested plugin named dbms_business of type filter. Error: NameError NameError"}

i infered that require statement is wrong,but i don't know how to fix

thanks in advance

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.