logstash.conf:
input {
file {
type => "_doc"
path => "/data/mysql_*_log/slow.log"
codec => multiline {
pattern => "^# User@Host:"
negate => true
what => "previous"
max_bytes => "100kib"
auto_flush_interval => 5
}
}
}
filter {
if [type] == "_doc" {
if ("multiline_codec_max_bytes_reached" in [tags]) {
drop {}
}
grok {
# User@Host: logstash[logstash] @ localhost [127.0.0.1]
# User@Host: logstash[logstash] @ [127.0.0.1]
#patterns_dir => ["./patterns"]
match => [ "message", "^# User@Host: (?<user_ignore>[\s\S]*)\[(?<user>[\s\S]*)\] @ (?<dbhost>[\s\S]*)? \[%{IP:clientip}\](\s+Id:\s+%{NUMBER:thread_id:int})?" ]
#match => [ "message", "^# User@Host: %{ALLWORD:user}\[%{ALLWAORD}\] @ %{ALLWORD:dbhost}? \[%{IP:clientip}\]" ]
}
grok {
# Query_time: 102.413328 Lock_time: 0.000167 Rows_sent: 0 Rows_examined: 1970
match => [ "message", "^# Query_time: %{NUMBER:Query_time:float}%{SPACE}Lock_time: %{NUMBER:lock_wait:float}%{SPACE}Rows_sent: %{NUMBER:Rows_sent:int}%{SPACE}Rows_examined:%{SPACE}%{NUMBER:Rows_examined:int}"]
}
# Capture the time the query happened
grok {
match => [ "message", "^SET timestamp=%{NUMBER:time};%{SPACE}(?<sql>[\s\S]*)" ]
}
if "_grokparsefailure" in [tags] { drop {} }
if [sql] =~ "^call p_status.*" { drop{} }
mutate {
convert => { "time" => "integer" }
}
date {
match => ["time", "UNIX"]
remove_field => ["host", "message","user_ignore", "tags"]
add_field => {"engine_type" => "mysqlslowlog" }
}
}
dbms_business {}
}
output {
}
stdout { codec =>rubydebug }
}
dbms_business.rb:
# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "mysql2"
# This example filter will replace the contents of the default
# message field with whatever you specify in the configuration.
#
# It is only intended to be used as an example.
class LogStash::Filters::Dbms_business < LogStash::Filters::Base
# Setting the config_name here is required. This is how you
# configure this filter from your Logstash config.
#
# filter {
# example {
# message => "My message..."
# }
# }
#
config_name "dbms_business"
# Replace the message with this value.
config :message, :validate => :string, :default => "Hello World!"
public
def register
# Add instance variables
@last_ip_time = 0
@last_ip = ''
@port_to_businesses=Hash.new()
@valid_time = 86400
end # def register
public
def filter(event)
return unless filter?(event)
ts = Time.now.to_i
if event.get("engine_type") == "mongodbslowlog"
business,port,groupname,productname,developer,operator,developer_dw,operator_dw,test,model = get_mongodb_business(ts, event.get("path"))
if ( business == "" || business == nil )
event.cancel()
end
event.set("business_name",business)
event.set("ip",get_ip_address(ts))
event.set("port",port)
event.set("group_name",groupname)
event.set("product_name",productname)
event.set("developer",developer)
event.set("operator",operator)
event.set("developer_dw",developer_dw)
event.set("operator_dw",operator_dw)
event.set("test",test)
event.set("buisness_model",model)
event.set("time",event.get("@timestamp").to_i * 1000)
elsif event.get("engine_type") == "mysqlslowlog"
p,ignore = event.get("path").split(File::SEPARATOR).last(2)
port = p.split('_')[1].to_i
ip = get_ip_address(ts)
business,groupname,productname,developer,operator,developer_dw,operator_dw,test,model,password = get_mysql_business(ts, port)
event.set("business_name",business)
event.set("ip",get_ip_address(ts))
event.set("port",port)
event.set("group_name",groupname)
event.set("product_name",productname)
event.set("developer",developer)
event.set("operator",operator)
event.set("developer_dw",developer_dw)
event.set("operator_dw",operator_dw)
event.set("test",test)
event.set("buisness_model",model)
event.set("time",event.get("time") * 1000)
thread_id=event.get("thread_id")
if (thread_id == "" || thread_id == nil)
event.set("dbname","")
else
event.set("dbname",get_db_from_thread(port,password,thread_id))
end
end
# correct debugging log statement for reference
# using the event.get API
@logger.debug? && @logger.debug("Message is now: #{event.get("message")}")
# filter_matched should go in the last line of our successful code
filter_matched(event)
end # def filter
def get_conf_value_from_file(file,key)
configfile=File.new(file, 'r')
while ( line = configfile.gets)
vars = line.chomp.split('=')
if ( vars.first == key )
vvv = vars.last
break
end
end
configfile.close
return vvv
rescue => err
return nil
end
......
def get_db_from_thread(port,password,thread_id)
client = Mysql2::Client.new(
:host => '127.0.0.1',
:port => port,
:username => 'db_monitor',
:password => password,
:database => 'mysql',
:encoding => 'utf8'
)
result = client.query("select db from information_schema.processlist where id=#{thread_id};")
result.each do |item|
if item["db"]
return item["db"]
else
return ""
end
end
end
end # class LogStash::Filters::Dbms_business
recently,i add block code "get_db_from_thread" function and require "mysql2"
,then it does not work now. but before the added code,it worked well.
the reported error:
[2023-03-22T11:02:10,137][ERROR][logstash.agent ] Cannot create pipeline {:reason=>"Couldn't find any filter plugin named 'dbms_business'. Are you sure this is correct? Trying to load the dbms_business filter plugin resulted in this error: Problems loading the requested plugin named dbms_business of type filter. Error: NameError NameError"}
i infered that require statement is wrong,but i don't know how to fix
thanks in advance