Below is my config i had written to create headers (key & value) in my logstash config , but still it does not reflect to me in headers under offset tool.
input {
elasticsearch {
hosts => ["localhost"]
index => "some_index*"
query => '{ "sort" : [ {"@timestamp" : {"order" : "asc"}} ], "query": { "range": { "@timestamp": {"gte": "now-1m","lt": "now"} }}}'
size => 1000
docinfo => true
schedule => "*/2 * * * *"
user => "elasticsearch"
password => "password"
}
}
filter {
mutate {
add_field => {
"[@metadata][header_key1]" => "Hello World"
"[@metadata][header_key2]" => "abc"
}
}
}
output{
kafka {
bootstrap_servers => "kafka-server1:9096"
topic_id => "INT_TEST"
security_protocol => "SASL_SSL"
sasl_mechanism => "SCRAM-SHA-512"
ssl_endpoint_identification_algorithm => "https"
sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='abc' password='password';"
client_id => "CLUSTER"
key_serializer => "org.apache.kafka.common.serialization.StringSerializer"
value_serializer => "org.apache.kafka.common.serialization.StringSerializer"
codec => json {}
}}
Please help how we can configure headers for kafka topic in logstash configuration