Good day!
I have this work config for logstash:
input {
kafka {
zk_connect => "kafka:2181"
topic_id => "requests"
group_id => "kafka_requests"
consumer_threads => 1
}
}
output {
elasticsearch {
codec => "json"
index => "kafka_requests-%{+YYYY.MM.dd}"
}
}
But elastic not contain offset kafka message (number of message).
I add
decorate_events => true
to input kafka section and have now
"kafka":{"msg_size":150,"topic":"requests","consumer_group":"kafka_requests",
"partition":2,"offset":1411769133,"key":null}}
Question:
How i may use subfield "kafka": "offset":1411769133 from input as document_id ( _id ) in output elasticsearch.
This need for messages non duplicate, for test all messages put to elastic, and so.
Thank you!