Logstash event ruby code

Hello

We now set our own field by ruby code in logstash.

There are three Binary fields.

We have two question about:

(1) The logstash setting code is:

event.set('[Binary1]', event.get('[event_data][Binary]')[0..7])

event.set('[Binary2]', event.get('[event_data][Binary]')[8..15])

event.set('[Binary3]', event.get('[event_data][Binary]')[16..23])

if the Binary1 is not equal to "00000000",how can we use "event.remove('[Binary1]')" to remove this event.set?

I use "if [Binary1]=="00000000" {event.remove('[Binary1]')}",but it can not work.

(2) If the Binary1 is not equal to "00000000",we want to sent the data to elasticsearch.

The field rows of Binary will only have Binary1 not three fileds in the same data on kibana page.

We want to split the log data by Binary fields.

If we have three Binary fields, we will see three data on kibana page.

Could we set the logstash configure to do this ?

I use "if [Binary1]=="00000000" {event.remove('[Binary1]')}",but it can not work.

Use event.get to get field values. Secondly, Ruby doesn't use curly braces in this way. Or do this operation outside the ruby filter. Conditionally removing a field depending on its value is doable without a ruby filter.

Thanks for your reply.

Is there any method to solve question 2 ?

I try to write three out set in logstash conf file or split the logstash into three cong file .

But the output always contain three Binary in one data view on kibana.

I can not split it to three message on kibana.

Add the three values to an array, then use a split filter on that array.

Hello magnusbaeck

We now can split the event by binary rows.

The logstash config is :

if [message]=="XXXXX"
{
ruby {
code => "
event.set('[Binary1]', event.get('[event_data][Binary]')[0..7])
event.set('[Binary2]', event.get('[event_data][Binary]')[8..15])
event.set('[Binary3]', event.get('[event_data][Binary]')[16..23])

"
}

mutate {
    merge => ["Binary1","Binary2"]
}
mutate {
    merge => ["Binary1","Binary3"]
}

}

}

filter {

if [message]=="XXXXX"
{
split {
field => "Binary1"
remove_field => "Binary2"
remove_field => "Binary3"
}
}
}

Is there any method to add a id number or message to show the original binary location ?

I use the "add_field" function,but i don't know how to get the id number.

Is there any method to add a id number or message to show the original binary location ?

Id number of what? What do you mean by "binary location"?

If you show an example rather then describing what you want it'll probably be easier to understand what you mean.

Hello magnusbaeck

Following our logstash setting,we will get three event data in kibana.

The event data json is:

event1:

{
"_index": "wineventlog-2017-11-24",
"_type": "wineventlog",
"_id": "XXXXXXXXXXXXX",
"_version": 1,
"_score": null,
"_source": {
"computer_name": "XXXXXX",
"keywords": [
"XX"
],
"log_name": "Application",
"level": "info",
"record_number": "38868",
"event_data": {
"param1": "XXXXXXX",
"Binary": "000000000000000100000002"
},
"message": "XXXXXXX",
"type": "wineventlog",
"opcode": "info",
"tags": [
"beats_input_codec_plain_applied",
"_grokparsefailure"
],
"@timestamp": "2017-11-24T08:13:50.000Z",
"event_id": 1,
"Binary1": "00000000",
"@version": "1",
"beat": {
"name": "XXXXXX",
"hostname": "XXXXXX",
"version": "5.6.3"
},
"host": "XXXXXXX",
"source_name": "XXXXXXX"
},
"fields": {
"@timestamp": [
1511511230000
]
},
"highlight": {
"message": [
"(XXXXXXXXXXXXXXXXXXXXXXXX)"
]
},
"sort": [
1511511230000
]
}

event2:

{
"_index": "wineventlog-2017-11-24",
"_type": "wineventlog",
"_id": "XXXXXXXXXXXXX",
"_version": 1,
"_score": null,
"_source": {
"computer_name": "XXXXXX",
"keywords": [
"XX"
],
"log_name": "Application",
"level": "info",
"record_number": "38868",
"event_data": {
"param1": "XXXXXXX",
"Binary": "000000000000000100000002"
},
"message": "XXXXXXX",
"type": "wineventlog",
"opcode": "info",
"tags": [
"beats_input_codec_plain_applied",
"_grokparsefailure"
],
"@timestamp": "2017-11-24T08:13:50.000Z",
"event_id": 1,
"Binary1": "00000001",
"@version": "1",
"beat": {
"name": "XXXXXX",
"hostname": "XXXXXX",
"version": "5.6.3"
},
"host": "XXXXXXX",
"source_name": "XXXXXXX"
},
"fields": {
"@timestamp": [
1511511230000
]
},
"highlight": {
"message": [
"(XXXXXXXXXXXXXXXXXXXXXXXX)"
]
},
"sort": [
1511511230000
]
}

event3:

{
"_index": "wineventlog-2017-11-24",
"_type": "wineventlog",
"_id": "XXXXXXXXXXXXX",
"_version": 1,
"_score": null,
"_source": {
"computer_name": "XXXXXX",
"keywords": [
"XX"
],
"log_name": "Application",
"level": "info",
"record_number": "38868",
"event_data": {
"param1": "XXXXXXX",
"Binary": "000000000000000100000002"
},
"message": "XXXXXXX",
"type": "wineventlog",
"opcode": "info",
"tags": [
"beats_input_codec_plain_applied",
"_grokparsefailure"
],
"@timestamp": "2017-11-24T08:13:50.000Z",
"event_id": 1,
"Binary1": "00000002",
"@version": "1",
"beat": {
"name": "XXXXXX",
"hostname": "XXXXXX",
"version": "5.6.3"
},
"host": "XXXXXXX",
"source_name": "XXXXXXX"
},
"fields": {
"@timestamp": [
1511511230000
]
},
"highlight": {
"message": [
"(XXXXXXXXXXXXXXXXXXXXXXXX)"
]
},
"sort": [
1511511230000
]
}

Now we want to add a filed in each event:

event1:

{
"_index": "wineventlog-2017-11-24",
"_type": "wineventlog",
"_id": "XXXXXXXXXXXXX",
"_version": 1,
"_score": null,
"_source": {
"computer_name": "XXXXXX",
"keywords": [
"XX"
],
"log_name": "Application",
"level": "info",
"record_number": "38868",
"event_data": {
"param1": "XXXXXXX",
"Binary": "000000000000000100000002"
},
"message": "XXXXXXX",
"type": "wineventlog",
"opcode": "info",
"tags": [
"beats_input_codec_plain_applied",
"_grokparsefailure"
],
"@timestamp": "2017-11-24T08:13:50.000Z",
"event_id": 1,
"Binary1": "00000000",
"location": "1",
"@version": "1",
"beat": {
"name": "XXXXXX",
"hostname": "XXXXXX",
"version": "5.6.3"
},
"host": "XXXXXXX",
"source_name": "XXXXXXX"
},
"fields": {
"@timestamp": [
1511511230000
]
},
"highlight": {
"message": [
"(XXXXXXXXXXXXXXXXXXXXXXXX)"
]
},
"sort": [
1511511230000
]
}

event2:

{
"_index": "wineventlog-2017-11-24",
"_type": "wineventlog",
"_id": "XXXXXXXXXXXXX",
"_version": 1,
"_score": null,
"_source": {
"computer_name": "XXXXXX",
"keywords": [
"XX"
],
"log_name": "Application",
"level": "info",
"record_number": "38868",
"event_data": {
"param1": "XXXXXXX",
"Binary": "000000000000000100000002"
},
"message": "XXXXXXX",
"type": "wineventlog",
"opcode": "info",
"tags": [
"beats_input_codec_plain_applied",
"_grokparsefailure"
],
"@timestamp": "2017-11-24T08:13:50.000Z",
"event_id": 1,
"Binary1": "00000001",
"location": "2",
"@version": "1",
"beat": {
"name": "XXXXXX",
"hostname": "XXXXXX",
"version": "5.6.3"
},
"host": "XXXXXXX",
"source_name": "XXXXXXX"
},
"fields": {
"@timestamp": [
1511511230000
]
},
"highlight": {
"message": [
"(XXXXXXXXXXXXXXXXXXXXXXXX)"
]
},
"sort": [
1511511230000
]
}

event3:

{
"_index": "wineventlog-2017-11-24",
"_type": "wineventlog",
"_id": "XXXXXXXXXXXXX",
"_version": 1,
"_score": null,
"_source": {
"computer_name": "XXXXXX",
"keywords": [
"XX"
],
"log_name": "Application",
"level": "info",
"record_number": "38868",
"event_data": {
"param1": "XXXXXXX",
"Binary": "000000000000000100000002"
},
"message": "XXXXXXX",
"type": "wineventlog",
"opcode": "info",
"tags": [
"beats_input_codec_plain_applied",
"_grokparsefailure"
],
"@timestamp": "2017-11-24T08:13:50.000Z",
"event_id": 1,
"Binary1": "00000002",
"location": "3",
"@version": "1",
"beat": {
"name": "XXXXXX",
"hostname": "XXXXXX",
"version": "5.6.3"
},
"host": "XXXXXXX",
"source_name": "XXXXXXX"
},
"fields": {
"@timestamp": [
1511511230000
]
},
"highlight": {
"message": [
"(XXXXXXXXXXXXXXXXXXXXXXXX)"
]
},
"sort": [
1511511230000
]
}

Is it possible?

Okay. So right now you build up an array that look like this:

["000000000", "1111111111111", "2222222222222"]

Change your ruby filter to produce this instead:

[{"binary": "000000000", "location": "1", {"binary": "1111111111111", "location": "2"}, {"binary": "2222222222222", "location": "3"}]

That way, the field in question will after the split contain an object. The subfields binary and location can then be moved into the top level of the event. The ruby code could look like this:

event.set('binary', 
    [
        {"binary" => event.get('[event_data][Binary]')[0..7]), "location" => "1"},
        {"binary" => event.get('[event_data][Binary]')[8..15]), "location" => "2"},
        {"binary" => event.get('[event_data][Binary]')[16..23]), "location" => "3"}
    ])

Hello magnusbaeck

I change the filter to :

filter {

grok {
match => [ "message", "%{COMMONAPACHELOG}" ]
}
if [message]=="XXXXX"
{
ruby {
code => "
event.set('binary',
[
{"binary" => event.get('[event_data][Binary]')[0..7]), "location" => "1"},
{"binary" => event.get('[event_data][Binary]')[8..15]), "location" => "2"},
{"binary" => event.get('[event_data][Binary]')[16..23]), "location" => "3"}
])

"
}
}

}

filter {

if [message]=="XXXXX"
{
split {
field => "binary"
}
}

}

The logstash event log show:

[ERROR][logstash.agent ] Cannot create pipeline {:reason=>"Expected one of #, {, } at line 18, column 11 (byte 207) after filter {\n\n grok {\n match => [ "message", "%{COMMONAPACHELOG}" ]\n }\nif [message]=="(INTEGRATION)"\n{\nruby {\n code => "\nevent.set('binary', \n [\n {""}

Is there a issue about event.set form?

The version of logstash is 5.6.4 on ubuntu.

1 Like

You can't use double quotes inside the ruby code block. Replace them with single quotes.

Hello magnusbaeck

The filter is :

filter {

grok {
match => [ "message", "%{COMMONAPACHELOG}" ]
}
if [message]=="XXXXX"
{
ruby {
code => "
event.set('binary',
[
{'binary' => event.get('[event_data][Binary]')[0..7]), 'location' => '1'},
{'binary' => event.get('[event_data][Binary]')[8..15]), 'location' => '2'},
{'binary' => event.get('[event_data][Binary]')[16..23]), 'location' => '3'}
])

"
}

split {
    field => "binary"
}

}

}

The event log not have error message.

But I can not see the data in kibana.

The logstash seems to not convert the data.

Always use a stdout { codec => rubydebug } output while you're debugging. Once you're happy with what the events look like you can enable the elasticsearch output again.

Hello magnusbaeck

Finally I set the logstash config:

ruby {
code => "
event.set('[Binary1]', event.get('[event_data][Binary]')[0..7]+'_1')
event.set('[Binary2]', event.get('[event_data][Binary]')[8..15]+'_2')
event.set('[Binary3]', event.get('[event_data][Binary]')[16..23]+'_3')

"
}

And then use kibana Scripted fields to do it.

Very thanks for your help.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.