Help with parsing Kafka key

Hello, I currently use [@metadata][kafka][key] to set the document ID of all my messages from the kafka input. I want to implement a way to also route the message to the appropriate index. There are 2 options I am thinking of:

  1. overload the key with "index_name:document_id" and parse this somehow in logstash
  2. add the index name/pattern into the message itself as a field, and set the index with this field value

I was not able to find a resource on how to parse the key into multiple parts, and then reference the part back into the index/document_id fields in the output, and I don't want to include the index name inside the document in elastic. I am ok with either approach if the solution is possible, or any suggestions on an alternate approach. Thanks!

Just put the index name into [@metadata][index] and sprintf reference it in the output, very similar to what you are doing with the key.

Thanks for the response, but I am not quite sure how I will set it into metadata. If i set it inside mutate, then I would need to fetch it from some source, such as the kafka key, but then I'd need to parse into tokens so I can extract the index name portion of the key. I don't know how to explicitly do this step (parsing the kafka key).

Or do you mean to say 'metadata' is a special keyword, and if I set a metadata field into the kafka message itself, I can get access to it?

e.g. { "field1" : "val1", "metadata" : { "index" : "idx_pttrn" } }

I'm not too familiar with the metadata field. I just know that certain fields like kafka info are given to me automatically based on the decorate flag. But the index name has to be manually set by me somewhere, whether it comes from extracting as part of the kafka message key, or somewhere else.

Further example:

filter { mutate { add_field => { "[@metadata][index]" => "[@metadata][kafka][key] <~ how do i parse inside here????" } } }

Hi

I have written a blog post of how I do this. Have a read and see if you can use it

https://www.securitydistractions.com/2019/03/23/simple-kafka-and-elasticsearch-integration/

Kim

Thank you. I was hoping to make use of mutate/split of the kafka key, but there wasn't a good example. Instead I took the option of injecting an extra key into the message and appending that value to the index name. Here are some snippets:

...
filter {
  mutate { 
    add_field => [ "[@metadata][idxSuffix]" , "%{suffix}" ] 
    remove_field => "suffix"
  }
}
...
output {
  elasticsearch {
    ...
    index => "myIndex-%{[@metadata][idxSuffix]}"
  }
}

I might revisit some metadata parsing eventually, but time constraints had me choosing a faster path :slight_smile:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.