Put value of nested field into new field

Hello there,

i'm trying to put a value of a nested field into a new field by the help of logstash filters. I thought that the mutate-filter would be suitable for that.

So here's what i'm trying to do:

Log4J parses a Logline into a JSON-File which arrives like this in Logstash:
{"@timestamp":"2018-03-15T07:55:09.493Z","fieldA":"someText","mdc":{"LoggingContext":{"nestedField1":"Value1","nestedField2":value2}}}

When I try to find the document in Elasticsearch the output looks like this:
"_index": "application_log-16.03.2018",
"_type": "doc",
"_id": "1sQ4PWIBDfzRTwMRiiAQ",
"_score": 0.039220713,
"_source": {
"mdc": {
"LoggingContext": {
"nestedField1": "Value1",
"nestedField2": "Value2"
}
},
"tags": [
"FNKT",
"beats_input_codec_plain_applied"
],
"type": "application_log",
"session_id": "1752B3D812159649067F27A236A8E874.be01",
"source_host": "be01",
"beat": {
"hostname": "LO01",
"version": "6.1.0",
"name": "LO01"
},
}

For a better handling in Kibana and queries in Elasticsearch I want to put the values of nestedField1 and nestedField2 in a new Field like it is done with "type" or "sessionId".

I tried the following mutate-filter but nothing seems to work:

mutate {
add_field => { "nestedField1" => "%{[mdc][LoggingContext][nestedField1]}"}
}

mutate {
add_field => { "nestedField1" => "%{[mdc][0][LoggingContext][nestedField1]}"}
}

mutate {
add_field => { "nestedField1" => "%{mdc.LoggingContext.nestedField1}"}
}

The add_field method works in case of adding the field but the value is false. When I look into Kibana there's just the value "%{[mdc][0][LoggingContext][nestedField1]}" for nestedField1.

Show your complete filter configuration.

Thank you for the quick reply, Magnus!

My Configuration looks like this:

input {

beats {
host => "192.168.141.214"
port => 5044
type => "application_log"
}

}

filter {

if [message] =~ "callBegin" {
grok {
match => { "message" => "(SessionId=%{BASE16FLOAT:session_id}) %{EMAILADDRESS:username} callBegin %{WORD:module} %{JAVACLASS:java_class}" }
}
} else if [message] =~ "callEnd" {
grok {
match => { "message" => "(SessionId=%{BASE16FLOAT:session_id}) %{EMAILADDRESS:username} callEnd %{WORD:module} %{JAVACLASS:java_class} %{BASE10NUM:duration_in_ms}ms" }
}
}

mutate{
add_field => { "newField1" => "%{[mdc][LoggingContext][nestedField1]}" }
}

}
output {

elasticsearch {
hosts => ["192.168.141.215:9200"]
index => ["application_log-%{+dd.MM.YYYY}"]
}

stdout {
codec => rubydebug
}

}

{"@timestamp":"2018-03-15T07:55:09.493Z","fieldA":"someText","mdc":{"LoggingContext":{"nestedField1":"Value1","nestedField2":value2}}}

This isn't valid JSON. Are you sure the value2 near the end isn't double-quoted?

If I make that correction things work fine:

$ cat test.config 
input { stdin { codec => json_lines } }
output { stdout { codec => rubydebug } }
filter {

if [message] =~ "callBegin" {
grok {
match => { "message" => "(SessionId=%{BASE16FLOAT:session_id}) %{EMAILADDRESS:username} callBegin %{WORD:module} %{JAVACLASS:java_class}" }
}
} else if [message] =~ "callEnd" {
grok {
match => { "message" => "(SessionId=%{BASE16FLOAT:session_id}) %{EMAILADDRESS:username} callEnd %{WORD:module} %{JAVACLASS:java_class} %{BASE10NUM:duration_in_ms}ms" }
}
}

mutate{
add_field => { "newField1" => "%{[mdc][LoggingContext][nestedField1]}" }
}

}
$ cat data 
{"@timestamp":"2018-03-15T07:55:09.493Z","fieldA":"someText","mdc":{"LoggingContext":{"nestedField1":"Value1","nestedField2":"value2"}}}
$ /opt/logstash/bin/logstash -f test.config < data
Settings: Default pipeline workers: 8
Pipeline main started
{
    "@timestamp" => "2018-03-15T07:55:09.493Z",
        "fieldA" => "someText",
           "mdc" => {
        "LoggingContext" => {
            "nestedField1" => "Value1",
            "nestedField2" => "value2"
        }
    },
      "@version" => "1",
          "host" => "lnxolofon",
     "newField1" => "Value1"
}
Pipeline main has been shutdown
stopping pipeline {:id=>"main"}

You're right. I forgot the quotes but that doesn't solved the problem.

You're example works also fine for me when the file "data" is the input for stdin. My input is filebeat and when i'm using it to read in a file to send it to logstash the mutate-filter fails... Could Filebeat be the problem here?

Here's my setup:

Filebeat config:

filebeat.prospectors:
- type: log
paths:
  - "/var/log/logstash-test.log"

output.logstash:
  hosts: ["192.168.141.214:5044"]

Content of file "logstash-test.log"
{"@timestamp":"2018-03-15T08:55:09.493Z","class":"base.application.authorization.Checker","mdc":{"LoggingContext":{"targetgroup":"blue","order":"9999999","companynumber":1,"wjBegin":""}}}

Logstash config:

input {

  beats {
    host => "192.168.141.214"
    port => 5044
    type => "application_log"
  }

}

filter {

  if [message] =~ "callBegin" {
    grok {
      match => { "message" => "\(SessionId=%{BASE16FLOAT:session_id}\) %{EMAILADDRESS:benutzername} callBegin %{WORD:modul} %{JAVACLASS:java_class}" }
    }
 } else if [message] =~ "callEnd" {
   grok {
     match => { "message" => "\(SessionId=%{BASE16FLOAT:session_id}\) %{EMAILADDRESS:benutzername} callEnd %{WORD:modul} %{JAVACLASS:java_class} %{BASE10NUM:dauer_in_ms}ms" }
  }
}

  mutate{
    add_field => { "firmennummer" => "%{[mdc][LoggingContext][firmennummer]}" }
  }

  json {
    source => "message"
  }

}

output {

  #elasticsearch {
  #  hosts => ["192.168.141.215:9200"]
  #  index => ["application_log-%{+dd.MM.YYYY}"]
  #}

  stdout {
    codec => rubydebug
  }

}

As a result the rubydebug gives me this:

{
            "type" => "application_log",
         "message" => "{\"@timestamp\":\"2018-03-15T08:55:09.493Z\",\"class\":\"base.application.authorization.Checker\",\"mdc\":{\"LoggingContext\":{\"targetgroup\":\"blue\",\"order\":\"9999999\",\"companynumber\":1,\"wjBegin\":\"\"}}}",
    "companynumber" => "%{[mdc][LoggingContext][companynumber]}",
             "mdc" => {
        "LoggingContext" => {
            "order" => "9999999",
                       "targetgroup" => "blue",
                         "wjBegin" => "",
                     "companynumber" => 1
        }
    },
          "offset" => 213,
      "prospector" => {
        "type" => "log"
    },
           "class" => "base.application.authorization.Checker",
            "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
          "source" => "/var/log/logstash-test.log",
      "@timestamp" => 2018-03-15T08:55:09.493Z,
            "beat" => {
            "name" => "LO01",
        "hostname" => "LO01",
         "version" => "6.1.0"
    },
            "host" => "LO01",
        "@version" => "1"
}

You can't have the json filter at the end. Filters are evaluated in order. But why not have Filebeat parse the JSON in the file?

I removed the JSON-Filter but that didn't help either. Should the mutate-filter be used as the first filter?

How can I achieve letting Filebeat parse the JSON? Is it doing it by default or do I have to change my config?

Should the mutate-filter be used as the first filter?

The json filter needs to go before any filters that depend on the parsed JSON data.

How can I achieve letting Filebeat parse the JSON? Is it doing it by default or do I have to change my config?

Have you checked the Filebeat documentation?

I put the JSON-Filter as the first in my configuration.

I've also checked the documentation now on this page Filter and enhance data with processors | Filebeat Reference [8.11] | Elastic.

For me it seems that the processor for Filebeat does the same as the JSON-Filter for Logstash.

I made some more tests with my Logstash configuration and the mutate filter works now for text inputs as you described it in your configuration above. But when my field "nestedField1" contains a number without quotes in the JSON I end up with my old problem.

You can see what I mean in the rubydebug on the field companynumber. In the document the value for companynumber is "%{[mdc][LoggingContext][companynumber]}". But it shoud have the value which is showing in the nested field company number under LoggingContext under mdc.

Works fine here, except that the companynumber field is a string.

$ cat test.config 
input { stdin { codec => json_lines } }
output { stdout { codec => rubydebug } }
filter {
  mutate {
    add_field => {
      "companynumber" => "%{[mdc][LoggingContext][companynumber]}"
    }
  }
}
$ cat data 
{"@timestamp":"2018-03-15T08:55:09.493Z","class":"base.application.authorization.Checker","mdc":{"LoggingContext":{"targetgroup":"blue","order":"9999999","companynumber":1,"wjBegin":""}}}
$ ~/logstash/logstash-6.1.0/bin/logstash -f test.config < data
Sending Logstash's logs to /home/magnus/logstash/logstash-6.1.0/logs which is now configured via log4j2.properties
[2018-03-20T19:13:42,900][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/home/magnus/logstash/logstash-6.1.0/modules/fb_apache/configuration"}
[2018-03-20T19:13:42,914][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/home/magnus/logstash/logstash-6.1.0/modules/netflow/configuration"}
[2018-03-20T19:13:43,321][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-03-20T19:13:43,930][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.1.0"}
[2018-03-20T19:13:44,408][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-03-20T19:13:47,550][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000, :thread=>"#<Thread:0x299555fb run>"}
[2018-03-20T19:13:47,625][INFO ][logstash.pipeline        ] Pipeline started {"pipeline.id"=>"main"}
[2018-03-20T19:13:47,728][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
{
       "@timestamp" => 2018-03-15T08:55:09.493Z,
    "companynumber" => "1",
         "@version" => "1",
              "mdc" => {
        "LoggingContext" => {
              "targetgroup" => "blue",
            "companynumber" => 1,
                  "wjBegin" => "",
                    "order" => "9999999"
        }
    },
            "class" => "base.application.authorization.Checker",
             "host" => "bertie"
}
[2018-03-20T19:13:48,446][INFO ][logstash.pipeline        ] Pipeline terminated {"pipeline.id"=>"main"}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.