Hello,
I have an array which contains a bunch of time stamps which get added every time the record is modified from the source PGSQL database. I'm looking to modify this array to contain both a string and a time stamp.
Here is my current code snippet:
elasticsearch {
ssl => true
ca_file => '/etc/logstash/ca.crt'
hosts => [<REDACTED>]
index => "test_vulnerability"
user => "logstash_internal"
query => "_id:%{source_console}-%{asset_id}-%{vulnerability_id}"
fields => { "scan_history" => "temp" }
fields => { "data_source" => "temp1" }
}
mutate {
convert => { "last_assessed_for_vulnerabilities" => "string" }
}
if [last_assessed_for_vulnerabilities] in [temp] {
drop { }
}
else {
mutate { add_field => { "temp" => "%{last_assessed_for_vulnerabilities}" } }
mutate { rename => { "temp" => "scan_history" } }
}
if "Nexpose" not in [temp1] {
mutate { add_field => { "temp1" => "Nexpose" } }
mutate { rename => { "temp1" => "data_source" } }
}
What I'm doing above is to pull out an array field called scan history and update that field with time stamps for each time the record is modified. If the associated time stamp from "last_assessed_for_vulnerabilities" is in the array already, then drop the event. If it's not, then add it to "temp", rename it to "scan_history" and push it to ES.
Here is my current array as an example and this works as expected:
"scan_history": [
"2022-02-04T21:41:27.006Z",
"2022-02-07T15:44:36.146Z"
],
I'm now hoping to bring in the field "data_source" into the array which is stored in a field called "temp1", from the ES filter. I'm hoping to achieve the following result...
"scan_history": [
{
"data_source": "Nexpose"
"scan_time": "2022-02-04T21:41:27.006Z"
}
{
"data_source": "Nexpose"
"scan_time": "2022-02-07T15:44:36.146Z"
}
],
I'd appreciate any help. From the examples I've seen it looks like I may need to use ruby but I'm not sure how to implement it.
Thank you!
Badger
February 7, 2022, 6:27pm
2
I have not tested this, but you are going to want something like
ruby {
code => '
scanHistory = event.get("scan_history")
if scanHistory.is_a? Array
newScanHistory = []
scanHistory.each { |x|
newScanHistory << { "data_source" => "Nexpose", "scan_time" => x }
}
event.set("scan_history", newScanHistory)
end
'
}
Hi Badger,
Trying to grasp the code here. What is the behavior if "scan_history" does not exist? If this field does not exist, I'd like it created with "data_source" and "scan_time".
Thanks!
Badger
February 7, 2022, 8:57pm
4
It would be a no-op. You could try something like
if scanHistory.is_a? Array
# The same as before
else
event.set("scan_history", { "data_source" => "Nexpose", "scan_time" => nil })
end
So I played around and make some progress.
elasticsearch {
ssl => true
ca_file => '/etc/logstash/ca.crt'
hosts =>
index => "test_vulnerability-163"
user => "logstash_internal"
query => "_id:%{source_console}-%{asset_id}-%{vulnerability_id}"
fields => { "scan_history" => "temp" }
fields => { "data_source" => "temp1" }
}
mutate {
convert => { "last_assessed_for_vulnerabilities" => "string" }
}
if [last_assessed_for_vulnerabilities] in [temp] {
drop { }
}
else {
ruby {
code => '
event.set("test", [{"source" => event.get("temp1"), "scan_time" => event.get("last_assessed_for_vulnerabilities")}])
'
}
mutate { add_field => { "temp" => "%{test}" } }
mutate { rename => { "temp" => "scan_history" } }
I'm getting this output...
"test": [
{
"source": "nexpose",
"scan_time": "2022-02-07T20:39:03.488Z"
}
]
But when I attempt to append it to my existing array called "temp", I seem to lose the nested fields.
"scan_history": [
"{source=nexpose, scan_time=2022-02-07T15:44:36.146Z}",
"{source=nexpose, scan_time=2022-02-07T20:39:03.488Z}"
],
Badger
February 7, 2022, 9:05pm
6
The sprintf reference calls the StringInterpolation code. If the referenced field is an object then it is converted to a string .
JeremyP
February 7, 2022, 10:16pm
7
It looks to overwrite the scan_history field each time the field is updated vs. appending to it. I'm doing some more debugging on my end.
elasticsearch {
ssl => true
ca_file => '/etc/logstash/ca.crt'
hosts =>
index => "test_vulnerability-163"
user => "logstash_internal"
query => "_id:%{source_console}-%{asset_id}-%{vulnerability_id}"
fields => { "scan_history" => "temp" }
fields => { "data_source" => "temp1" }
}
mutate {
convert => { "last_assessed_for_vulnerabilities" => "string" }
}
if [last_assessed_for_vulnerabilities] in [temp] {
drop { }
}
else {
ruby {
code => '
scanHistory = event.get("temp")
if scanHistory.is_a? Array
newScanHistory = []
scanHistory.each { |x|
newScanHistory << { "source" => "nexpose", "scan_time" => x }
}
event.set("scan_history", newScanHistory)
else
event.set("scan_history", { "source" => "nexpose", "scan_time" => event.get("last_assessed_for_vulnerabilities")})
end
'
}
Hi Badger,
I made some progress, but I'm having issues parsing the array unless I specify the object #. Example: [0][1].
This statement is meant to prevent a new timestamp from being added to the array. I can only match if I specify the object #.
if [last_assessed_for_vulnerabilities] in [temp][0][scan_time] {
drop { }
}
I'm also having the same issue when parsing the "temp" array. Same problem as above.
ruby {
code => '
scanHistory = event.get("temp")
newScanHistory = []
scanHistory.each { |x|
newScanHistory << { "source" => event.get("[temp][0][source]"), "scan_time" => event.get("[temp][0][scan_time]") }
newScanHistory << { "source" => "nexpose", "scan_time" => event.get("last_assessed_for_vulnerabilities") }
}
event.set("scan_history", newScanHistory)
'
}
I appreciate your help. I'm very close I think.
Badger
February 8, 2022, 5:49pm
9
Since [temp][0][scan_time] is a string that is a substring match. The other use of "in" is an array membership test. But it just tests equality with the array member. It will not look at structure within the array member if it is an object.
If you use output { stdout { codec => rubydebug } }
then what do the [temp] and [last_assessed_for_vulnerabilities] fields look like, and what do you want the resulting event to look like?
JeremyP
February 8, 2022, 6:49pm
10
Here are the fields from a sample event...
"last_assessed_for_vulnerabilities" => "2022-02-08T06:38:47.116Z",
"temp" => [
[0] {
"source" => "nexpose",
"scan_time" => "2022-02-08T02:28:27.332Z"
}
So three things....
I'd like to drop the event if "last_assessed_for_vulnerabilities" is contained in "temp". This means I've already assessed the vulnerability and it can be disregarded.
If "temp" does not exist, create it and add "last_assessed_for_vulnerabilities" to the record. I can then rename it to "scan_history". Subsequent updates to this record will fall under the 3rd item below.
Example:
"temp" => [
[0] {
"source" => "nexpose",
"scan_time" => "2022-02-08T06:38:47.116Z"
}
If it hasn't been seen, I'd like to append "last_assessed_for_vulnerabilities" to "temp", and push the event to a field called "scan_history" where logstash updates the record on ES. (example below)
"temp" => [
[0] {
"source" => "nexpose",
"scan_time" => "2022-02-08T02:28:27.332Z"
}
[1] {
"source" => "nexpose",
"scan_time" => "2022-02-08T06:38:47.116Z"
}
],
Thank you.
Badger
February 8, 2022, 7:26pm
11
OK, so
input { generator { count => 1 codec => json lines => [
'{ "last": "A", "temp": [ { "source": "nexpose", "scan_time": "A" } ] }',
'{ "last": "A" }',
'{ "last": "A", "temp": [ { "source": "nexpose", "scan_time": "B" }, { "source": "nexpose", "scan_time": "C" } ] }',
'{ "last": "A", "temp": [ { "source": "nexpose", "scan_time": "B" }, { "source": "nexpose", "scan_time": "A" } ] }'
] } }
filter {
ruby {
code => '
last = event.get("last")
if last
temp = event.get("temp")
if ! temp
event.set("temp", { "source" => "nexpose", "scan_time" => last })
else
found = false
temp.each { |x|
if x["scan_time"] == last
event.cancel
found = true
end
}
if ! found
temp << { "source" => "nexpose", "scan_time" => last }
end
event.set("temp", temp)
end
end
'
}
}
will delete the first and last events, and for the other two produce
{
...
"last" => "A",
"temp" => {
"source" => "nexpose",
"scan_time" => "A"
}
}
{
...
"last" => "A",
"temp" => [
[0] {
"source" => "nexpose",
"scan_time" => "B"
},
[1] {
"source" => "nexpose",
"scan_time" => "C"
},
[2] {
"source" => "nexpose",
"scan_time" => "A"
}
]
}
JeremyP
February 9, 2022, 8:10pm
12
It's throwing up a Ruby exception.
[2022-02-09T19:59:27,756][ERROR][logstash.filters.ruby ][main][c588053e303396ef132238dd796d1a0eacd3a7a2dfbaaef09b803b3dbefbb7b2] Ruby exception occurred: no implicit conversion of String into Integer {:class=>"TypeError", :backtrace=>["org/jruby/RubyArray.java:1501:in []'", "(ruby filter code):11:in
block in filter_method'", "org/jruby/RubyHash.java:1415:in each'", "(ruby filter code):10:in
block in filter_method'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-ruby-3.1.7/lib/logstash/filters/ruby.rb:93:in inline_script'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-ruby-3.1.7/lib/logstash/filters/ruby.rb:86:in
filter'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:159:in do_filter'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:178:in
block in multi_filter'", "org/jruby/RubyArray.java:1821:in each'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:175:in
multi_filter'", "org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:134:in multi_filter'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:299:in
block in start_workers'"]}
Here is the JSON for the event in question...
"last_assessed_for_vulnerabilities" => "2022-02-09T19:49:33.402Z",
"scan_history" => {
"source" => "nexpose",
"scan_time" => "2022-02-09T06:44:20.793Z"
},
With no scan history, it adds the first value no problem. Once the record is updated, it throws the exception. Here is the code:
elasticsearch {
ssl => true
ca_file => '/etc/logstash/ca.crt'
hosts =>
index => "test_vulnerability-163"
user => "logstash_internal"
query => "_id:%{source_console}-%{asset_id}-%{vulnerability_id}"
fields => { "scan_history" => "temp" }
fields => { "data_source" => "temp1" }
}
ruby {
code => '
last = event.get("last_assessed_for_vulnerabilities")
if last
temp = event.get("temp")
if ! temp
event.set("temp", { "source" => "nexpose", "scan_time" => last })
else
found = false
temp.each { |x|
if x["scan_time"] == last
event.cancel
found = true
end
}
if ! found
temp << { "source" => "nexpose", "scan_time" => last }
end
event.set("temp", temp)
end
end
'
}
mutate { rename => { "temp" => "scan_history" } }
Thanks.
Badger
February 9, 2022, 9:18pm
13
JeremyP:
if x["scan_time"]
That error is telling you that x is not a hash, it's an array. So it looks like [temp] is an array of arrays.
JeremyP
February 9, 2022, 9:28pm
14
"scan_history" => {
"source" => "nexpose",
"scan_time" => "2022-02-09T06:44:20.793Z"
},
This is the value of temp, which looks like a typical array.
Any suggestions?
Badger
February 9, 2022, 11:36pm
15
That's not an array, it is a hash.
JeremyP
February 9, 2022, 11:44pm
16
Ok. You mentioned that the filter is complaining about temp being an array within an array but my Ruby debug output shows that it is a hash. Any suggestions on how to make the filter work as intended?
Thank you.
Badger
February 10, 2022, 12:18am
17
I misinterpreted the error message. This
input { generator { count => 1 codec => json lines => [
'{ "last": "A", "temp": [ { "source": "nexpose", "scan_time": "A" } ] }',
'{ "last": "A", "temp": { "source": "nexpose", "scan_time": "A" } }',
'{ "last": "A", "temp": { "source": "nexpose", "scan_time": "B" } }',
'{ "last": "A" }',
'{ "last": "A", "temp": [ { "source": "nexpose", "scan_time": "B" }, { "source": "nexpose", "scan_time": "C" } ] }',
'{ "last": "A", "temp": [ { "source": "nexpose", "scan_time": "B" }, { "source": "nexpose", "scan_time": "A" } ] }'
] } }
filter {
ruby {
code => '
last = event.get("last")
if last
temp = event.get("temp")
if ! temp
event.set("temp", { "source" => "nexpose", "scan_time" => last })
elsif temp.is_a? Array
found = false
temp.each { |x|
if x["scan_time"] == last
event.cancel
found = true
end
}
if ! found
temp << { "source" => "nexpose", "scan_time" => last }
end
event.set("temp", temp)
elsif temp.is_a? Hash
if temp["scan_time"] == last
event.cancel
end
end
end
'
}
}
will handle temp whether it is a hash or an array.
JeremyP
February 10, 2022, 4:07pm
18
No ruby exception error this time, but "last" does not get added to "temp" when a hash is detected, so I do not see two entries in my "temp". Do we need to add "temp << { "source" => "nexpose", "scan_time" => last }" after the last elsif statement? I did try it, but it fired an exception again.
Here is the JSON from my test.
"last_assessed_for_vulnerabilities" => 2022-02-10T06:38:50.351Z,
"temp" => {
"source" => "nexpose",
"scan_time" => "2022-02-09T06:44:20.793Z"
}
The code:
ruby {
code => '
last = event.get("last_assessed_for_vulnerabilities")
if last
temp = event.get("temp")
if ! temp
event.set("temp", { "source" => "nexpose", "scan_time" => last })
elsif temp.is_a? Array
found = false
temp.each { |x|
if x["scan_time"] == last
event.cancel
found = true
end
}
if ! found
temp << { "source" => "nexpose", "scan_time" => last }
end
event.set("temp", temp)
elsif temp.is_a? Hash
if temp["scan_time"] == last
event.cancel
end
end
end
'
}
Badger
February 10, 2022, 7:10pm
19
Change the is_a? Hash
branch to
if temp["scan_time"] == last
event.cancel
else
event.set("temp", [ temp, { "source" => "nexpose", "scan_time" => last } ])
end
JeremyP
February 14, 2022, 5:31pm
20
Thank you Badger... everything is working. Incredible insight on this. I really appreciate the knowledge you offer.