Hi,
I'm trying to consume Kafka data hosted on an on-prem Kafka bus using Logstash in a cloud-hosted environment. There is a single massive topic with about 100 partitions, the messages I need are all located in a single partition. Is there any way to have my Kafka input plugin filter the messages 'at the source', i.e. make sure only the data for that partition gets transferred in order to reduce bandwidth consumption? For now I'm doing the filtering like below (simplified for readability) by filtering on a messagekey, but it is my understanding that this will transfer all data and then filter in my cloud environment.
input {
kafka {
auto_offset_reset => "${kafka_auto_offset_reset}" # where to start if this consumer group never ran before
bootstrap_servers => "${kafka_bootstrap_servers}" # kafka bootstrap servers
decorate_events => "${decorate_events}" # append kafka metadata to the output. If false, disable the associated mutator in the filter section
group_id => "${kafka_group_id}" # identifier of this listener group. Used to register and remember offsets in case of crashes
security_protocol => "SSL"
ssl_keystore_location => "/usr/share/logstash/auth/keystore.jks"
ssl_keystore_password => "${ssl_keystore_password}"
ssl_keystore_type => "JKS"
ssl_truststore_location => "/usr/share/logstash/config/cacerts"
ssl_truststore_password => "${ssl_truststore_password}"
ssl_truststore_type => "JKS"
topics => "${kafka_topic}" # kafka topic to read
}
}
filter {
mutate {
copy => { "[@metadata][kafka]" => "kafka" }
add_field => {"[@metadata][messagekey_filter]" => "${kafka_messagekey}"}
}
}
output {
if [@metadata][kafka][key] == [@metadata][messagekey_filter] {
elasticsearch {
hosts => ["${elasticsearch_host}"]
index => "${elasticsearch_index}"
user => "${elasticsearch_user}"
password => "${elasticsearch_password}"
ssl => true
cacert => "mycert.pem"
ilm_enabled => "false"
manage_template => "false"
}
}
}