Problem Statement
I believe there is a bug in the ruby filter plugin when using event.remove
Expected Output:
...
event_data.sql_text => "some sql text"
event_data.logType => "Oracle"
...
With no root fields (e.g., sql_text or logType). All data should reside under event_data nested object.
Background:
I am using docker instances to run an ES stack (3 Elasticsearches, 1 Logstash).
I have tested this with docker versions 6.0.0 and 6.3.0 for Logstash.
I'm pulling in data from variety of sources, specifically what I'm having an issue with is JDBC input. I'm pulling from an Oracle DB some audit logs.
I standardize my data by putting the data into a nested object so I can search common tags across multiple sources (Windows/Linux/Oracle, etc.).
Enter my issue. I'm pulling data in from an Oracle query select statement, and then I put it through a ruby filter to extract the key/value pairs, and set new ones for my standard naming convention. When I remove the old root key/value pairs, the event seems to be deleted and nothing gets submitted to ES. When trying to narrow down my issue, I moved my event.remove statement outside of the loop, and noticed that if I do two event.remove statements, I get the result. However if only have one remove statement outside of the loop, the data is intact, with only the specific field removed.
Sample Config
input {
jdbc {
jdbc_validate_connection => true
jdbc_connection_string => "jdbc:oracle:thin:@system01.com:1521/sid01"
jdbc_user => "user01"
jdbc_password => "supersecretpassword"
jdbc_driver_library => "/usr/share/logstash/lib/pluginmanager/ojdbc8.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
statement => "select audit_type, sessionid, os_username, userhost, dbusername, SYSTEM_PRIVILEGE_USED, SQL_TEXT, RETURN_CODE, event_timestamp from sys.unified_audit_trail where to_timestamp(event_timestamp, 'DD-MON-YY HH.MI.SS.FF AM') > :sql_last_value"
schedule => "*/2 * * * *"
}
}
filter {
ruby {
code => "
require 'time'
require 'date'
event.set('[insertTime]',Time.now.to_i)
"
}
date {
match => ["insertTime", "UNIX"]
target => "insertTime"
}
if [logType] == "Oracle" {
# Set the timestamp to the one of dba_audit_trail
mutate { convert => [ "timestamp" , "string" ]}
date { match => ["timestamp", "ISO8601"]}
if [comment_text] =~ /(?i)Authenticated by/ {
grok {
match => [ "comment_text","^.*(?i)Authenticated by: (?<authenticated_by>.*?)\;.*$" ]
}
if [comment_text] =~ /(?i)EXTERNAL NAME/ {
grok {
match => [ "comment_text","^.*(?i)EXTERNAL NAME: (?<external_name>.*?)\;.*$" ]
}
}
}
# remove temporary fields
mutate { remove_field => ["timestamp"] } }
ruby {
code => '
event.to_hash.each {|key,value|
event.set("[event_data][#{key}]", value)
}
'
}
}
output {
elasticsearch {
hosts => ["elastic01:9200"]
index => "audit_databases_oracle-%{+YYYY.MM.dd}"
}
I modified the config from this: Push Oracle Auditing to ES
The interesting parts are in the ruby filter.
Problem Area
ruby {
code => '
event.to_hash.each {|key,value|
event.set("[event_data][#{key}]", value)
event.remove("[#{key}]")
}
'
}
The above code will result in NO data being entered into ES.
However, isolating the problem to this snippet, will result in data being nested in event_data, with only one field being removed:
ruby {
code => '
event.to_hash.each {|key,value|
event.set("[event_data][#{key}]", value)
event.remove("sql_text")
}
'
}
Sample Output:
...
event_data.sql_text => "some sql text"
event_logType => "Oracle"
logType => "Oracle"
...
And additionally, adding two event.remove results in no data being put into ES:
ruby {
code => '
event.to_hash.each {|key,value|
event.set("[event_data][#{key}]", value)
event.remove("sql_text")
event.remove("logType")
}
'
}
Here is where I believe the issue is:
By using the event.remove function call twice, it results in the entire deletion of the event, instead of the specific key fields.