Ruby Filter - Event.Remove Inconsistent Results

Problem Statement

I believe there is a bug in the ruby filter plugin when using event.remove

Expected Output:

...
event_data.sql_text => "some sql text"
event_data.logType => "Oracle"
...

With no root fields (e.g., sql_text or logType). All data should reside under event_data nested object.

Background:

I am using docker instances to run an ES stack (3 Elasticsearches, 1 Logstash).

I have tested this with docker versions 6.0.0 and 6.3.0 for Logstash.

I'm pulling in data from variety of sources, specifically what I'm having an issue with is JDBC input. I'm pulling from an Oracle DB some audit logs.

I standardize my data by putting the data into a nested object so I can search common tags across multiple sources (Windows/Linux/Oracle, etc.).

Enter my issue. I'm pulling data in from an Oracle query select statement, and then I put it through a ruby filter to extract the key/value pairs, and set new ones for my standard naming convention. When I remove the old root key/value pairs, the event seems to be deleted and nothing gets submitted to ES. When trying to narrow down my issue, I moved my event.remove statement outside of the loop, and noticed that if I do two event.remove statements, I get the result. However if only have one remove statement outside of the loop, the data is intact, with only the specific field removed.

Sample Config

input {
    jdbc {
        jdbc_validate_connection => true
        jdbc_connection_string => "jdbc:oracle:thin:@system01.com:1521/sid01"
        jdbc_user => "user01"
        jdbc_password => "supersecretpassword"
        jdbc_driver_library => "/usr/share/logstash/lib/pluginmanager/ojdbc8.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        statement => "select audit_type, sessionid, os_username, userhost, dbusername, SYSTEM_PRIVILEGE_USED, SQL_TEXT, RETURN_CODE, event_timestamp from sys.unified_audit_trail where to_timestamp(event_timestamp, 'DD-MON-YY HH.MI.SS.FF AM') > :sql_last_value"
        schedule => "*/2 * * * *"
       }
}

filter {
    ruby {
        code => "
            require 'time'
            require 'date'
            event.set('[insertTime]',Time.now.to_i)
        "
    }
    date {
        match => ["insertTime", "UNIX"]
        target => "insertTime"
    }
    if [logType] == "Oracle" {
    # Set the timestamp to the one of dba_audit_trail
    mutate { convert => [ "timestamp" , "string" ]}
    date { match => ["timestamp", "ISO8601"]}
 
    if [comment_text] =~ /(?i)Authenticated by/ {
 
    grok {
    match => [ "comment_text","^.*(?i)Authenticated by: (?<authenticated_by>.*?)\;.*$" ]
     }
 
    if [comment_text] =~ /(?i)EXTERNAL NAME/ {
    grok {
    match => [ "comment_text","^.*(?i)EXTERNAL NAME: (?<external_name>.*?)\;.*$" ]
     }
     }
    }
 
    # remove temporary fields
    mutate { remove_field => ["timestamp"] } }
    ruby {
       code => '
	       event.to_hash.each {|key,value|
		      event.set("[event_data][#{key}]", value)
	}
       '
    }
}

output   {
elasticsearch {
    hosts => ["elastic01:9200"]
    index => "audit_databases_oracle-%{+YYYY.MM.dd}"
}

I modified the config from this: Push Oracle Auditing to ES

The interesting parts are in the ruby filter.

Problem Area

    ruby {
       code => '
	       event.to_hash.each {|key,value|
		      event.set("[event_data][#{key}]", value)
		      event.remove("[#{key}]")
	}
       '
    }

The above code will result in NO data being entered into ES.

However, isolating the problem to this snippet, will result in data being nested in event_data, with only one field being removed:

    ruby {
       code => '
	       event.to_hash.each {|key,value|
		      event.set("[event_data][#{key}]", value)
	       event.remove("sql_text")
	}
       '
    }

Sample Output:

...
event_data.sql_text => "some sql text"
event_logType => "Oracle"
logType => "Oracle"
...

And additionally, adding two event.remove results in no data being put into ES:

    ruby {
       code => '
	       event.to_hash.each {|key,value|
		      event.set("[event_data][#{key}]", value)
	       event.remove("sql_text")
           event.remove("logType")
	}
       '
    }

Here is where I believe the issue is:

By using the event.remove function call twice, it results in the entire deletion of the event, instead of the specific key fields.

If I run

output { stdout { codec => rubydebug } }
input { generator { count => 1 message => '{"foo": "1", "bar": 2}' } }
filter {
    json { source => "message" }
        ruby {
   code => '
       event.to_hash.each {|key,value|
          event.set("[event_data][#{key}]", value)
          event.remove("[#{key}]")
}
   '
}
}

Then I get all the data moved under event_data

{
"event_data" => {
"sequence" => 0,
"message" => "{"foo": "1", "bar": 2}",
"bar" => 2,
"@version" => "1",
"host" => "[...]",
"foo" => "1",
"@timestamp" => 2018-07-02T20:56:26.665Z
}
}

If I add an elasticsearch output then I get an error

[2018-07-02T16:59:41,979][ERROR][org.logstash.execution.WorkerLoop] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.
org.jruby.exceptions.RaiseException: (LogStash::Error) timestamp field is missing

Try

          if key != "@timestamp"
            event.set("[event_data][#{key}]", value)
            event.remove("[#{key}]")
          end

if key != "@timestamp"

Though this does correctly not put @timestamp under event_data, it does not fix the issue. I still end up with no data being submitted into ES. I also am Monitoring Logstash/ES logs and I see no errors.

So my data is still being removed from everything.

Doing some more testing.

This code snippet results in no data being submitted to ES:

        event.to_hash.each {|key,value|
                if key != "sql_text" || key != "logType"
                        event.set("[event_data][#{key}]", value)
                        event.remove("[#{key}]")
                end

Yet this results in data in ES, with only the sql_text being under the nested object of Event_Data and no root level field sql_text:

        event.to_hash.each {|key,value|
                if key == "sql_text"
                        event.set("[event_data][#{key}]", value)
                        event.remove("[#{key}]")
                end

The difference being event.remove being run twice, vs. being run once.

I believe there is something going on behind the scenes that I am unaware of. I believe either my logic is wrong, or there is a bug in logstash when using the event.remove functionality.

if key != "sql_text" || key != "logType"

is effectively the same as:

if true

The key will always be either not-equal-to-sql_text OR not-equal-to-logType.

Which means you are moving the @timestamp from the root as @Badger mentioned and should be producing an error in the Elasticsearch output.

Do you have a Dead Letter Queue enabled, and if so, are items being sent there?

I would also recommend adding a debug-style output while you're getting your config sorted, which will pretty-print each event (this will output to stdout):

output {
  stdout {
    codec => rubydebug
  }
}

Yeah, I had my logic flipped on that.

Here is what I've tested with your recommendations:

  • Turned on Dead Letter Queue
    • Settings Set in logstash yml:
log.level: debug
dead_letter_queue.enable: true
path.dead_letter_queue: /usr/share/logstash/logs

Here is the updated ruby code snippet:

ruby {
       code => '
        event.to_hash.each {|key,value|
                if key != "@timestamp" && key != "@version"
                        event.set("[event_data][#{key}]", value)
                        event.remove("[#{key}]")
                end
        }
       '
   }

Additionally, I've added the output to stdout using the rubydebug. However, I am using docker and I'm not seeing any of this data being sent to the logstash log.

This all adds up to nothing being added into Elasticsearch. However, if I just have one event.remove statement that is executed, either inside the loop or statically outside of it, I will get the data inside ES.

I'm not sure what else can be confirmed besides this maybe being a bug inside of Logstash.

1 Like

Okay, I can get the debug data now. I had to enable a port via Docker. Found here.

Here is an example of the data I am getting from a sample event. I've redacted some sensitive information.

The raw output is below. However, I believe the interesting piece is here:

[2018-07-05T14:55:33,217][DEBUG][logstash.pipeline        ] output received {"event"=>{"@timestamp"=>2018-07-05T14:55:33.039Z, "@version"=>"1", "event_data"=>{"os_process"=>"51118512", "event_timestamp"=>2018-07-05T14:52:31.045Z, "audit_type"=>"Standard", "object_name"=>"UserTEST", "action_name"=>"DROP USER", "unified_audit_policies"=>"ORA_ACCOUNT_MGMT, ORA_SECURECONFIG", "system_privilege_used"=>"DROP USER", "logType"=>"Oracle", "sql_text"=>"DROP USER UserTEST\u0000", "sessionid"=>#<BigDecimal:60d634bd,'0.272393717E9',9(12)>, "fga_policy_name"=>nil, "return_code"=>#<BigDecimal:5ecfe7b1,'0.0',1(4)>, "userhost"=>"ubuntu", "dbusername"=>"SVC_User_AUDIT1", "external_userid"=>nil, "insertTime"=>2018-07-05T14:55:33.000Z, "os_username"=>"User1234"}}}

You can see that I am able to put everything under the event_data nested object, but no data is being submitted into the ES.

I am still not able to see anything in the dead letter queue. So I don't see any error while this wouldn't be able to submit to ES.

Partial Output (Removed due length requirements):


[2018-07-05T14:55:33,129][DEBUG][logstash.pipeline        ] filter received {"event"=>{"event_timestamp"=>2018-07-05T14:52:30.162Z, "audit_type"=>"Standard", "object_name"=>"UserTEST", "action_name"=>"CREATE USER", "unified_audit_policies"=>"ORA_ACCOUNT_MGMT, ORA_SECURECONFIG", "system_privilege_used"=>"CREATE USER", "@timestamp"=>2018-07-05T14:55:33.023Z, "@version"=>"1", "sql_text"=>"CREATE USER UserTEST IDENTIFIED BY NULL", "sessionid"=>#<BigDecimal:311bce39,'0.272393717E9',9(12)>, "logType"=>"Oracle", "fga_policy_name"=>nil, "return_code"=>#<BigDecimal:462051e1,'0.0',1(4)>, "userhost"=>"ubuntu", "dbusername"=>"SVC_User_AUDIT1", "external_userid"=>nil, "os_username"=>"User1234", "os_process"=>"51118512"}}
[2018-07-05T14:55:33,132][DEBUG][logstash.pipeline        ] output received {"event"=>{"@timestamp"=>2018-07-05T14:55:32.998Z, "@version"=>"1", "event_data"=>{"os_process"=>"51118512", "event_timestamp"=>2018-07-05T14:50:51.705Z, "audit_type"=>"Standard", "object_name"=>"UserTEST", "action_name"=>"DROP USER", "unified_audit_policies"=>"ORA_ACCOUNT_MGMT, ORA_SECURECONFIG", "system_privilege_used"=>"DROP USER", "logType"=>"Oracle", "sql_text"=>"DROP USER UserTEST\u0000", "sessionid"=>#<BigDecimal:6cb8ad51,'0.272393717E9',9(12)>, "fga_policy_name"=>nil, "return_code"=>#<BigDecimal:7346eb29,'0.0',1(4)>, "userhost"=>"ubuntu", "dbusername"=>"SVC_User_AUDIT1", "external_userid"=>nil, "insertTime"=>2018-07-05T14:55:33.000Z, "os_username"=>"User1234"}}}

Is the path you set as the dead letter queue mounted from your host OS, or is it a path solely inside your docker container? If you're unsure, I would recommend disabling the DLQ until you get the other issues sorted out.

If possible, I would also recommend cutting docker out of your setup entirely while you debug, and only start using docker once you have a working configuration.

I had it mounted from my OS, within my docker.

I might have cut out docker, but I don't think that is the issue.

I was able to solve my problem, but without using a programmatic approach, but doing a static rename of every item.

I ended up using mutate { rename => in order to accomplish.

Snippet:

mutate { rename => {
...
    "pid" => ""[event_data][pid]"
    "os_username" => ""[event_data][os_username]"
    "sql_text" => ""[event_data][sql_text]"
    "return_code" => ""[event_data][return_code]"
...
}

I also tried using JSON API to see if I might be able to accomplish this instead of using hashes/dictionaries, but I was still unable to correctly modify the data.

I can certainly add data via ruby filter with event.set("[event_data][#{key}]", value) but any time I try to remove it, I end up with nothing in my event. Setting if statements to disregard @Version/@timestamp still result in no data.

Can't seem to find a root cause for this result.

While my rename isn't the best option, as I have to statically rename every field, I can't seem to find a solution for it.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.