Making decisions on the basis of message keys


(Faraz Mateen) #1

I am reading messages from a kafka topic and pushing them to logstash. It's pretty straight forward for a simple json but now I want to update my packet structure. Update messages have the following structure:


{

  "indexname": string,

  "clientname": string,

  "data": {key1: value1, key2: value2, ..... , key3: value3, "datetime":"2017-09-08 11:11:11"}

}

Now, I want to choose my index on the basis of indexname key but I only want push the json under "data" key to elasticsearch. Also, there is a datetime field inside data json that I am using in the index name.

Currently with non-nested json, my config looks like this:


input {

  kafka {

    bootstrap_servers => "localhost:9092"

    type => "redis-input"

    topics => ["test-topic"]

    codec => "json"

    consumer_threads => 5

    group_id => "group_id"

    auto_offset_reset => "latest"

  }

}

filter {

   date {

      match => [ [data][datetime], "UNIX", "UNIX_MS", "dd/MM/yyyy:HH:mm:ss Z", "yyyy-MM-dd HH:mm:ss" ]

   }

}

output {

   elasticsearch {

      hosts => ["127.0.0.1"]

      index => "clientindex-%{+yyyy}-%{+MM}"

   }

}

Now I want to replace "clientindex" from the output block to whatever indexname I get in the message and then I want to forward the contents on "data" key only.

Can anyone advise the filter that can help me achieve this.

Any help will be appreciated.

--

Faraz Mateen


(Christian Dahlqvist) #2

If you know this field is always set and contains only valid field name prefixes (you may want to add logic to check this) you can create an output block like this:

output {
   elasticsearch {
      hosts => ["127.0.0.1"]
      index => "%{[indexname]}-%{+yyyy}-%{+MM}"
   }
}

(Faraz Mateen) #3

@Christian_Dahlqvist
Thanks. That works for index naming

how can I extract only one key from the message and send it to the output? For example, from the sample packet above, I only want to send contents under "data" key to elasticsearch.