Hi. I use Logstash 2.3.4 to read events from Kafka broker and have a problem with the filtering.
I am trying to flatten the uniq_log_name1 object and turn uniq_log_name1 into a tag. My problem is not knowing what dynamic variable is available within the Ruby code interpreter after Kafka decodes the event as JSON.
My testing works fine on the command-line but when using the Kafka input it fails because of the JSON decoding. I know this by testing with json codec on stdin and by disabling json codec on the Kafka input.
The object names 'uniq_log_name1' and 'uniq_log_name2' are dynamic.. I want to make them part of the tags for the event.
The events looks like this:
{
uniq_log_name1: {
property0: "zero",
property1: "one",
property2: "two",
property3: "three"
}
}
{
uniq_log_name2: {
property0: "zero",
property1: "one",
property2: "two",
property3: "three",
}
}
When running the example from stdin the result looks good:
{
"message" => "property0:zero,property1:one,property2:two,property3:three",
"@version" => "1",
"@timestamp" => "2017-01-13T01:44:06.177Z",
"host" => "logz",
"tags" => [
[0] "uniq_log_name1"
],
"property0" => "zero",
"property1" => "one",
"property2" => "two",
"property3" => "three"
}
But when using the Kafka input with default JSON codec Ruby gives a error:
Ruby exception occurred: undefined method `split' for nil:NilClass {:level=>:error}
Here is the config:
input {
stdin {
codec => json
}
}
input {
kafka {
zk_connect => "127.0.0.1:2181"
type => "logz"
topic_id => "logz"
}
}
filter {
json {
source => "message"
}
The ruby code below strips the logtype prefix and adds it as a tag.
http://stackoverflow.com/questions/24605331/logstash-grok-filter-name-fields-dynamically
ruby {
code => "
fieldArray = event['message'].split(': {')
for field in fieldArray
field = field.delete '{'
field = field.delete '}'
field = field.delete '"'
#
# THis changes for ES 5.x
# Adding tags after executing ruby code filter
#
# tags doesnt exist at first so the log type is assigned as a tag.
# The second time the for loop runs tags is already defined so
# the message field is overwritten with the remaining message.
# This removes the log type prefix and makes the next ruby
# section possible to create key/value pairs.
#
if event['tags']
event['message'] = field
else
event['tags'] = [ field ]
end
end
"
}
This ruby code splits the log message with CSV and dynamically names fields based on their
name in the log.
ruby {
code => "
fieldArray = event['message'].split(',')
for field in fieldArray
field = field.delete '"'
result = field.split(':')
event[result[0]] = result[1]
end
"
}
output { stdout { codec => "rubydebug" } }
Now I understand why that fails but I still cannot fix my problem. The reason is because the Kafka input is automatically using the JSON codec which changes the event object available in the Ruby code interpreter. There is no documentation on this object on how the data is represented.
In theory it should be as easy as something like :
ruby {
code => "
print event; # prints "2017-01-13T02:00:29.130Z %{host} %{message}
print '-------------------blank_space-------------------';
print event['message']; # this is nil ?
print event['source']; # this is nil ?
event['message'].each do |key, value|
message['tags'] = [ key ];
message['message'] = value;
end
"
}
But this also fails because event seems to be a string. Where did the message data go ? The resulting output still shows a JSON object just as expected but without the modifications.
Thank you for your help.