Logstash::Event and Ruby code for dynamic tags with Logstash 2.3.4


#1

Hi. I use Logstash 2.3.4 to read events from Kafka broker and have a problem with the filtering.

I am trying to flatten the uniq_log_name1 object and turn uniq_log_name1 into a tag. My problem is not knowing what dynamic variable is available within the Ruby code interpreter after Kafka decodes the event as JSON.

My testing works fine on the command-line but when using the Kafka input it fails because of the JSON decoding. I know this by testing with json codec on stdin and by disabling json codec on the Kafka input.

The object names 'uniq_log_name1' and 'uniq_log_name2' are dynamic.. I want to make them part of the tags for the event.

The events looks like this:

{
uniq_log_name1: {
property0: "zero",
property1: "one",
property2: "two",
property3: "three"
}
}

{
uniq_log_name2: {
property0: "zero",
property1: "one",
property2: "two",
property3: "three",
}
}

When running the example from stdin the result looks good:

{
"message" => "property0:zero,property1:one,property2:two,property3:three",
"@version" => "1",
"@timestamp" => "2017-01-13T01:44:06.177Z",
"host" => "logz",
"tags" => [
[0] "uniq_log_name1"
],
"property0" => "zero",
"property1" => "one",
"property2" => "two",
"property3" => "three"
}

But when using the Kafka input with default JSON codec Ruby gives a error:

Ruby exception occurred: undefined method `split' for nil:NilClass {:level=>:error}

Here is the config:

input {
stdin {

codec => json

}
}

input {
kafka {
zk_connect => "127.0.0.1:2181"
type => "logz"
topic_id => "logz"
}
}

filter {

json {

source => "message"

}

The ruby code below strips the logtype prefix and adds it as a tag.

http://stackoverflow.com/questions/24605331/logstash-grok-filter-name-fields-dynamically

ruby {
code => "
fieldArray = event['message'].split(': {')
for field in fieldArray
field = field.delete '{'
field = field.delete '}'
field = field.delete '"'
#
# THis changes for ES 5.x
# Adding tags after executing ruby code filter
#
# tags doesnt exist at first so the log type is assigned as a tag.
# The second time the for loop runs tags is already defined so
# the message field is overwritten with the remaining message.
# This removes the log type prefix and makes the next ruby
# section possible to create key/value pairs.
#
if event['tags']
event['message'] = field
else
event['tags'] = [ field ]
end
end
"
}

This ruby code splits the log message with CSV and dynamically names fields based on their

name in the log.

ruby {
code => "
fieldArray = event['message'].split(',')
for field in fieldArray

field = field.delete '"'

            result = field.split(':')
            event[result[0]] = result[1]
        end
    "

}

output { stdout { codec => "rubydebug" } }

Now I understand why that fails but I still cannot fix my problem. The reason is because the Kafka input is automatically using the JSON codec which changes the event object available in the Ruby code interpreter. There is no documentation on this object on how the data is represented.

In theory it should be as easy as something like :

ruby {
code => "
print event; # prints "2017-01-13T02:00:29.130Z %{host} %{message}
print '-------------------blank_space-------------------';
print event['message']; # this is nil ?
print event['source']; # this is nil ?

       event['message'].each do |key, value|
           message['tags'] = [ key ];
           message['message'] = value;
       end
    "

}

But this also fails because event seems to be a string. Where did the message data go ? The resulting output still shows a JSON object just as expected but without the modifications.

Thank you for your help.


#2

The lines with bold font were meant to be comments.


(Andrew Cholakian) #3

I think that if you create a simple pipeline with just input { kafka { ... } output { stdout { codec => rubydebug } } you'll be able to easily debug this problem. The real issue here seems to be some sort of failure to parse the actual data from kafka. If you can post a follow-up with the output of that I think that'll make getting to the bottom of the nil error easier.


#4

Hello. Here is the output:

{
"uniq_log_file1" => {
"property0" => "zero",
"property1" => "one",
"property2" => "two",
"property3" => "three"
},
"@version" => "1",
"@timestamp" => "2017-01-13T01:44:06.177Z",
"type" => "logz"
}

I want to flatten the "uniq_log_file1" object properties into the parent event and apply the object name as a tag.

The end result would be:

{
"property0" => "zero",
"property1" => "one",
"property2" => "two",
"property3" => "three"
"@version" => "1",
"@timestamp" => "2017-01-13T01:44:06.177Z",
"type" => "logz",
"tags" => [ "uniq_log_file1" ]
}

I need to do this dynamically as "uniq_log_file1" is just one of 20+ log file names, thus the reason for using the ruby code. But the problem is I don't know how to access the event object properties by reference. I thought event.each would work but it does not.


#5

Here's an example showing how the Kafka plugin seems to change the LogStash::Event object. Both stdin and Kafka are using the JSON codec but they result in differences to the Event object. (I found the Event object defined in vendor/bundle/jruby/1.9/gems/logstash-core-event-2.3.4-java/lib/logstash/event.rb).

With input stdin { codec => json } and input at the shell using echo:

echo '{"uniq_log_file1": {"property0":"zero","property1":"one","property2":"two","property3":"three",}' | ./logstash -f logstash_kafka_debug.conf
Settings: Default pipeline workers: 16
Pipeline main started
-----------Start----------
2017-01-14T17:53:33.656Z svr {"uniq_log_file1": {"property0":"zero","property1":"one","property2":"two","property3":"three",}
-------------Next---------
{"uniq_log_file1": {"property0":"zero","property1":"one","property2":"two","property3":"three",}
--------------End---------
{
"message" => "{"uniq_log_file1": {"property0":"zero","property1":"one","property2":"two","property3":"three",}",
"tags" => [
[0] "_jsonparsefailure"
],
"@version" => "1",
"@timestamp" => "2017-01-14T17:53:33.656Z",
"host" => "svr"
}

With input Kafka:

-----------Start----------
2017-01-14T17:48:26.360Z %{host} %{message}
-------------Next---------
nil
--------------End---------
{
"uniq_log_file1" => {
"property0" => "zero",
"property1" => "one",
"property2" => "two",
"property3" => "three"
},
"@version" => "1",
"@timestamp" => "2017-01-14T17:48:26.360Z"
"type" => "logz"
}

The config file below. When using Kafka replace input stdin with input { kafka { ... } }.

input {
stdin {
codec => json
}
}

filter {
ruby {
code => "
print '-----------Start----------\n';
print event;
print '-------------Next---------\n';
print event['message'];
print '--------------End---------\n';
}
}
output { stdout { codec => "rubydebug" } }


(Sandeepkanabar) #6

In order to use event code in ruby, with key,value, you first need to convert the event to hash using event.to_hash. In order to try ruby code, you can use irb interactive console to test out your ruby code.


#7

For some reason I still cannot iterate over what acts like a hash. Printing event['uniq_log_file1'] works OK but attempting to use event.each results in an error:

Ruby exception occurred: undefined method `each' for #LogStash::Event:0x31d01028 {:level=>:error}

input {
stdin {
codec => json
}
}
filter {
ruby {
code => "
event.to_hash;
event.each do |key, value|
puts 'Key: ', key;
puts 'Value: ', value;
end
puts 'Message: ', event['uniq_log_file1'];
"
}
}
output { stdout { codec => "rubydebug" } }


#8

Some progress.... this works:

ruby {
code => "
event['uniq_log_file1'].to_hash;
puts 'Message: ', event['uniq_log_file1']['property0'];
event['uniq_log_file1'].each do |key, value|
puts 'Key: ', key;
puts 'Value: ', value;
end
"
}

But my problem is I don't know the name "uniq_log_file1" at runtime. Maybe I should build a large conditional to test for all possible log types.

Am I trying to solve this problem correctly ? All I want to do is move the root to a tag and flatten the object.

ex:

{"uniq_log_file1": {"property0":"zero","property1":"one","property2":"two","property3":"three"}}

Becomes:

{"tags":"uniq_log_file1", "property0":"zero","property1":"one","property2":"two","property3":"three"}


#9

Thanks for the pointers. I've solved the problem:

filter {
json {
source => "message"
target => "wtfyo"
}

ruby {
code => "
event['wtfyo'].each do |key, value|
event.tag(key);
value.to_hash;
value.each do |key2, val2|
event[key2] = val2;
end
event.remove('wtfyo');
event.remove('message');
end
"
}
}


(system) #10

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.