Struggling with field substitution syntax

I'm using the kafka input plugin to create events from Kafka messages. This all works fine.

The next step is that many of the Kafka message payloads are in fact JSON, and I want to parse the JSON into the event so as to be able to search etc on individual fields of the JSON once it's got into Elasticsearch.

Now, I don't have complete control over the schemas for the various JSON messages, and it's possible that some clever person will have different fields in different topics with the same name but different data types. Allowing Elasticsearch to autogenerate the mappings then doesn't work ...

So, I want to use the "target" setting of the json filter to namespace the output by topic. I've got the topic in kafka.topic from

mutate
{
    add_field =>
    {
        "[kafka][topic]"          => "%{[@metadata][kafka][topic]}"
    }
}

which works fine. I then want the output from the json filter to look like this (where the topic name is "example-topic"):

"json": { "example-topic": { ... parsed message ... } }

I have tried a number of variations on

json
{
    source       => "message"
    target       => "[json][%{[kafka][topic]}]"
}

but none of them gives a sensible result. What, please, is the correct set of quotes, percents, dots, and brackets of various shapes to get the substitution I want?

Even

    json
    {
        source       => "message"
        target       => "%{[kafka][topic]}"
    }

doesn't work, which is the exact same syntax that does work in add_field. The output is:

|?  %{.kafka.topic.}.asset-id|     |  9f9801cc-28a9-4e9b-8167-5731012fee12|
|---|---|---|
|?  %{.kafka.topic.}.connected|     |  false|
|?  %{.kafka.topic.}.router|     |  er-dev6|
|?  %{.kafka.topic.}.timestamp|     |  2018-11-02T13:35:28.610Z|

What's going on here please?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.