Input from kafka to logstash

Hi All,

my architecture is as below

filebeat -> kafka -> logstash -> elastic

from, filebeat i am successfully pushing logs to kafka, what i observed in kafka topic message is log":{"file":{"path":"/mnt/volume0/XXXX-v2-free-service/services/free-v-2-0-13/namesrv/logs/name-srv-express-info1.log"},"offset":4501449},"level":"info"}

now my question is, while considering kafka input to logstash, can i do a filter based on log":{"file":{"path": , why i need that is because, as you observe in the folder structure there is folder named free-v-2-0-13 , based on that name i need to add field called version and that folder name changes dynamically!!!
if this can be done, plz explain how....

any help would be really appreciated.

can any one help me on this plz?

Assuming you have valid JSON coming from kafka, you can use a json filter to parse it. Then you can use grok or dissect to extract the folder name from [log][file][path]

1 Like

thanks Badger.. it helped me lot!!!

for someone, who is facing same issue, here is the solution for it..

filter {

grok {
match => { "[log][file][path]" => "/mnt/volume0/somename-v[0-9]-free-service/services/(?[^/]+)/" }
}
}