Adding multiple values to an array

Hello,

I have an array which contains a bunch of time stamps which get added every time the record is modified from the source PGSQL database. I'm looking to modify this array to contain both a string and a time stamp.

Here is my current code snippet:

  elasticsearch {
    ssl => true
    ca_file => '/etc/logstash/ca.crt'
    hosts => [<REDACTED>]
    index => "test_vulnerability"
    user => "logstash_internal"
    query => "_id:%{source_console}-%{asset_id}-%{vulnerability_id}"
    fields => { "scan_history" => "temp" }
    fields => { "data_source" => "temp1" }
  }

  mutate {
    convert => { "last_assessed_for_vulnerabilities" => "string" }
  }

  if [last_assessed_for_vulnerabilities] in [temp] {
    drop { }
  }
  else {
    mutate { add_field => { "temp" => "%{last_assessed_for_vulnerabilities}" } }
    mutate { rename => { "temp" => "scan_history" } }
  }

  if "Nexpose" not in [temp1] {
    mutate { add_field => { "temp1" => "Nexpose" } }
    mutate { rename => { "temp1" => "data_source" } }
  }

What I'm doing above is to pull out an array field called scan history and update that field with time stamps for each time the record is modified. If the associated time stamp from "last_assessed_for_vulnerabilities" is in the array already, then drop the event. If it's not, then add it to "temp", rename it to "scan_history" and push it to ES.

Here is my current array as an example and this works as expected:

"scan_history": [
      "2022-02-04T21:41:27.006Z",
      "2022-02-07T15:44:36.146Z"
    ],

I'm now hoping to bring in the field "data_source" into the array which is stored in a field called "temp1", from the ES filter. I'm hoping to achieve the following result...

"scan_history": [
      {
	    "data_source": "Nexpose"
		"scan_time": "2022-02-04T21:41:27.006Z"
	  }
	  { 
	    "data_source": "Nexpose"
		"scan_time": "2022-02-07T15:44:36.146Z"
      }
    ],

I'd appreciate any help. From the examples I've seen it looks like I may need to use ruby but I'm not sure how to implement it.

Thank you!

I have not tested this, but you are going to want something like

ruby {
    code => '
            scanHistory = event.get("scan_history")
            if scanHistory.is_a? Array
                newScanHistory = []
                scanHistory.each { |x|
                    newScanHistory <<  { "data_source" => "Nexpose", "scan_time" => x } 
                }
                event.set("scan_history", newScanHistory)
            end
    '
}

Hi Badger,

Trying to grasp the code here. What is the behavior if "scan_history" does not exist? If this field does not exist, I'd like it created with "data_source" and "scan_time".

Thanks!

It would be a no-op. You could try something like

        if scanHistory.is_a? Array
            # The same as before
        else
            event.set("scan_history", { "data_source" => "Nexpose", "scan_time" => nil })
        end

So I played around and make some progress.

  elasticsearch {
    ssl => true
    ca_file => '/etc/logstash/ca.crt'
    hosts => 
    index => "test_vulnerability-163"
    user => "logstash_internal"
    query => "_id:%{source_console}-%{asset_id}-%{vulnerability_id}"
    fields => { "scan_history" => "temp" }
    fields => { "data_source" => "temp1" }
  }

  mutate {
    convert => { "last_assessed_for_vulnerabilities" => "string" }
  }

  if [last_assessed_for_vulnerabilities] in [temp] {
    drop { }
  }
  else {
    ruby {
      code => '
        event.set("test", [{"source" => event.get("temp1"), "scan_time" => event.get("last_assessed_for_vulnerabilities")}])
      '
    }

  mutate { add_field => { "temp" => "%{test}" } }
  mutate { rename => { "temp" => "scan_history" } }

I'm getting this output...

    "test": [
      {
        "source": "nexpose",
        "scan_time": "2022-02-07T20:39:03.488Z"
      }
    ]

But when I attempt to append it to my existing array called "temp", I seem to lose the nested fields.

  "scan_history": [
      "{source=nexpose, scan_time=2022-02-07T15:44:36.146Z}",
      "{source=nexpose, scan_time=2022-02-07T20:39:03.488Z}"
    ],

The sprintf reference calls the StringInterpolation code. If the referenced field is an object then it is converted to a string.

It looks to overwrite the scan_history field each time the field is updated vs. appending to it. I'm doing some more debugging on my end.

  elasticsearch {
    ssl => true
    ca_file => '/etc/logstash/ca.crt'
    hosts => 
    index => "test_vulnerability-163"
    user => "logstash_internal"
    query => "_id:%{source_console}-%{asset_id}-%{vulnerability_id}"
    fields => { "scan_history" => "temp" }
    fields => { "data_source" => "temp1" }
  }

  mutate {
    convert => { "last_assessed_for_vulnerabilities" => "string" }
  }

  if [last_assessed_for_vulnerabilities] in [temp] {
    drop { }
  }
  else {
    ruby {
      code => '
        scanHistory = event.get("temp")
        if scanHistory.is_a? Array
          newScanHistory = []
          scanHistory.each { |x|
            newScanHistory <<  { "source" => "nexpose", "scan_time" => x }
          }
          event.set("scan_history", newScanHistory)
        else
          event.set("scan_history", { "source" => "nexpose", "scan_time" => event.get("last_assessed_for_vulnerabilities")})
        end
      '
    }

Hi Badger,

I made some progress, but I'm having issues parsing the array unless I specify the object #. Example: [0][1].

This statement is meant to prevent a new timestamp from being added to the array. I can only match if I specify the object #.

if [last_assessed_for_vulnerabilities] in [temp][0][scan_time] {
    drop { }
  }

I'm also having the same issue when parsing the "temp" array. Same problem as above.

    ruby {
      code => '
        scanHistory = event.get("temp")
        newScanHistory = []
        scanHistory.each { |x|
          newScanHistory <<  { "source" => event.get("[temp][0][source]"), "scan_time" => event.get("[temp][0][scan_time]") }
          newScanHistory <<  { "source" => "nexpose", "scan_time" => event.get("last_assessed_for_vulnerabilities") }
        }
        event.set("scan_history", newScanHistory)
      '
    }

I appreciate your help. I'm very close I think.

Since [temp][0][scan_time] is a string that is a substring match. The other use of "in" is an array membership test. But it just tests equality with the array member. It will not look at structure within the array member if it is an object.

If you use output { stdout { codec => rubydebug } } then what do the [temp] and [last_assessed_for_vulnerabilities] fields look like, and what do you want the resulting event to look like?

Here are the fields from a sample event...

"last_assessed_for_vulnerabilities" => "2022-02-08T06:38:47.116Z",

"temp" => [
        [0] {
               "source" => "nexpose",
            "scan_time" => "2022-02-08T02:28:27.332Z"
        }

So three things....

  1. I'd like to drop the event if "last_assessed_for_vulnerabilities" is contained in "temp". This means I've already assessed the vulnerability and it can be disregarded.

  2. If "temp" does not exist, create it and add "last_assessed_for_vulnerabilities" to the record. I can then rename it to "scan_history". Subsequent updates to this record will fall under the 3rd item below.

Example:

"temp" => [
        [0] {
               "source" => "nexpose",
            "scan_time" => "2022-02-08T06:38:47.116Z"
        }
  1. If it hasn't been seen, I'd like to append "last_assessed_for_vulnerabilities" to "temp", and push the event to a field called "scan_history" where logstash updates the record on ES. (example below)
 "temp" => [
        [0] {
               "source" => "nexpose",
            "scan_time" => "2022-02-08T02:28:27.332Z"
        }
        [1] {
              "source" => "nexpose",
            "scan_time" => "2022-02-08T06:38:47.116Z"
        }
    ],

Thank you.

OK, so

input { generator { count => 1 codec => json lines => [
 '{ "last": "A", "temp": [ { "source": "nexpose", "scan_time": "A" } ] }',
 '{ "last": "A" }',
 '{ "last": "A", "temp": [ { "source": "nexpose", "scan_time": "B" }, { "source": "nexpose", "scan_time": "C" } ] }',
 '{ "last": "A", "temp": [ { "source": "nexpose", "scan_time": "B" }, { "source": "nexpose", "scan_time": "A" } ] }'
] } }
filter {
    ruby {
        code => '
            last = event.get("last")
            if last
                temp = event.get("temp")
                if ! temp
                    event.set("temp", { "source" => "nexpose", "scan_time" => last })
                else
                    found = false
                    temp.each { |x|
                        if x["scan_time"] == last
                            event.cancel
                            found = true
                        end
                    }
                    if ! found
                        temp << { "source" => "nexpose", "scan_time" => last }
                    end
                    event.set("temp", temp)
                end
            end
        '
    }
}

will delete the first and last events, and for the other two produce

{
...
      "last" => "A",
      "temp" => {
       "source" => "nexpose",
    "scan_time" => "A"
}
}
{
...
      "last" => "A",
      "temp" => [
    [0] {
           "source" => "nexpose",
        "scan_time" => "B"
    },
    [1] {
           "source" => "nexpose",
        "scan_time" => "C"
    },
    [2] {
           "source" => "nexpose",
        "scan_time" => "A"
    }
]
}

It's throwing up a Ruby exception.

[2022-02-09T19:59:27,756][ERROR][logstash.filters.ruby ][main][c588053e303396ef132238dd796d1a0eacd3a7a2dfbaaef09b803b3dbefbb7b2] Ruby exception occurred: no implicit conversion of String into Integer {:class=>"TypeError", :backtrace=>["org/jruby/RubyArray.java:1501:in []'", "(ruby filter code):11:in block in filter_method'", "org/jruby/RubyHash.java:1415:in each'", "(ruby filter code):10:in block in filter_method'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-ruby-3.1.7/lib/logstash/filters/ruby.rb:93:in inline_script'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-ruby-3.1.7/lib/logstash/filters/ruby.rb:86:in filter'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:159:in do_filter'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:178:in block in multi_filter'", "org/jruby/RubyArray.java:1821:in each'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:175:in multi_filter'", "org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:134:in multi_filter'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:299:in block in start_workers'"]}

Here is the JSON for the event in question...

"last_assessed_for_vulnerabilities" => "2022-02-09T19:49:33.402Z",
"scan_history" => {
           "source" => "nexpose",
        "scan_time" => "2022-02-09T06:44:20.793Z"
    },

With no scan history, it adds the first value no problem. Once the record is updated, it throws the exception. Here is the code:

  elasticsearch {
    ssl => true
    ca_file => '/etc/logstash/ca.crt'
    hosts => 
    index => "test_vulnerability-163"
    user => "logstash_internal"
    query => "_id:%{source_console}-%{asset_id}-%{vulnerability_id}"
    fields => { "scan_history" => "temp" }
    fields => { "data_source" => "temp1" }
  }

  ruby {
    code => '
      last = event.get("last_assessed_for_vulnerabilities")
      if last
        temp = event.get("temp")
        if ! temp
          event.set("temp", { "source" => "nexpose", "scan_time" => last })
        else
          found = false
          temp.each { |x|
            if x["scan_time"] == last
              event.cancel
              found = true
            end
          }
          if ! found
            temp << { "source" => "nexpose", "scan_time" => last }
          end
          event.set("temp", temp)
        end
      end
    '
  }

  mutate { rename => { "temp" => "scan_history" } }

Thanks.

That error is telling you that x is not a hash, it's an array. So it looks like [temp] is an array of arrays.

"scan_history" => {
           "source" => "nexpose",
        "scan_time" => "2022-02-09T06:44:20.793Z"
    },

This is the value of temp, which looks like a typical array.

Any suggestions?

That's not an array, it is a hash.

Ok. You mentioned that the filter is complaining about temp being an array within an array but my Ruby debug output shows that it is a hash. Any suggestions on how to make the filter work as intended?

Thank you.

I misinterpreted the error message. This

input { generator { count => 1 codec => json lines => [
 '{ "last": "A", "temp": [ { "source": "nexpose", "scan_time": "A" } ] }',
 '{ "last": "A", "temp": { "source": "nexpose", "scan_time": "A" } }',
 '{ "last": "A", "temp": { "source": "nexpose", "scan_time": "B" } }',
 '{ "last": "A" }',
 '{ "last": "A", "temp": [ { "source": "nexpose", "scan_time": "B" }, { "source": "nexpose", "scan_time": "C" } ] }',
 '{ "last": "A", "temp": [ { "source": "nexpose", "scan_time": "B" }, { "source": "nexpose", "scan_time": "A" } ] }'
] } }
filter {
    ruby {
        code => '
            last = event.get("last")
            if last
                temp = event.get("temp")
                if ! temp
                    event.set("temp", { "source" => "nexpose", "scan_time" => last })
                elsif temp.is_a? Array
                    found = false
                    temp.each { |x|
                        if x["scan_time"] == last
                            event.cancel
                            found = true
                        end
                    }
                    if ! found
                        temp << { "source" => "nexpose", "scan_time" => last }
                    end
                    event.set("temp", temp)
                elsif temp.is_a? Hash
                    if temp["scan_time"] == last
                        event.cancel
                    end
                end
            end
        '
    }
}

will handle temp whether it is a hash or an array.

No ruby exception error this time, but "last" does not get added to "temp" when a hash is detected, so I do not see two entries in my "temp". Do we need to add "temp << { "source" => "nexpose", "scan_time" => last }" after the last elsif statement? I did try it, but it fired an exception again.

Here is the JSON from my test.

"last_assessed_for_vulnerabilities" => 2022-02-10T06:38:50.351Z,

"temp" => {
        "source" => "nexpose",
        "scan_time" => "2022-02-09T06:44:20.793Z"
    }

The code:

  ruby {
    code => '
      last = event.get("last_assessed_for_vulnerabilities")
        if last
          temp = event.get("temp")
            if ! temp
              event.set("temp", { "source" => "nexpose", "scan_time" => last })
            elsif temp.is_a? Array
              found = false
              temp.each { |x|
              if x["scan_time"] == last
                event.cancel
                found = true
              end
              }
              if ! found
                temp << { "source" => "nexpose", "scan_time" => last }
              end
              event.set("temp", temp)
            elsif temp.is_a? Hash
              if temp["scan_time"] == last
                event.cancel
              end
            end
        end
    '
  }

Change the is_a? Hash branch to

                if temp["scan_time"] == last
                    event.cancel
                else
                    event.set("temp", [ temp, { "source" => "nexpose", "scan_time" => last } ])
                end

Thank you Badger... everything is working. Incredible insight on this. I really appreciate the knowledge you offer.