Error while diverting output to multiple secured kafka topic

I am trying to divert output to different secured kafka topics based on the topic name received from HTTP end point.

for single topic my configuration is working fine. If I add if else based on topic name condition intermittently I keep on getting below error.

working cofig

input {
http {
host => "127.0.0.1" # default: 0.0.0.0
port => "9600" # default: 8080
codec => "json"
}
}

filter
{

mutate { add_field => { "[@metadata][topic1]" => "%{[headers][topic]}"} }
mutate { add_field => { "[@metadata][kafka_key]" => "%{[headers][kafka_key]}"} }
mutate {remove_field => [headers] }

}
output {

if [@metadata][topic1] == "data_cop_test_topic" {
kafka {
bootstrap_servers => "xxx"
topic_id => "%{[@metadata][topic1]}"
jaas_path => ["C:\kafka_kerberos\jass1.conf"]
kerberos_config => ["C:\kafka_kerberos\krb5.conf"]
sasl_kerberos_service_name => "kafka"
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "GSSAPI"
message_key => "%{[@metadata][kafka_key]}"
codec => json{}
}
}

}

In the output if I add something like

output {

if [@metadata][topic1] == "data_cop_test_topic" {
kafka {
bootstrap_servers => "xxx"
topic_id => "%{[@metadata][topic1]}"
jaas_path => ["C:\kafka_kerberos\jass1.conf"]
kerberos_config => ["C:\kafka_kerberos\krb5.conf"]
sasl_kerberos_service_name => "kafka"
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "GSSAPI"
message_key => "%{[@metadata][kafka_key]}"
codec => json{}
}
}

else if [@metadata][topic1] == "CDO_TestTopic1" {
kafka {
bootstrap_servers => "xxx"
topic_id => "%{[@metadata][topic1]}"
jaas_path => ["C:\kafka_kerberos\jass2.conf"]
kerberos_config => ["C:\kafka_kerberos\krb5.conf"]
sasl_kerberos_service_name => "kafka"
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "GSSAPI"
message_key => "%{[@metadata][kafka_key]}"
codec => json{}
}
}

}
I start getting below error.

[2019-08-26T13:07:39,239][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 2.1.0
[2019-08-26T13:07:39,239][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : eec43959745f444f
[2019-08-26T13:07:39,316][INFO ][logstash.javapipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>3, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>375, :thread=>"#<Thread:0x49fef6e8 run>"}
[2019-08-26T13:07:39,973][INFO ][logstash.javapipeline ] Pipeline started {"pipeline.id"=>"main"}
[2019-08-26T13:07:40,020][INFO ][logstash.inputs.http ] Starting http input listener {:address=>"127.0.0.1:9600", :ssl=>"false"}
[2019-08-26T13:07:40,083][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>}
[2019-08-26T13:07:40,958][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9601}
[2019-08-26T13:08:17,913][INFO ][logstash.outputs.file ] Opening file {:path=>"C:/logstash-7.1.1/raw/2019.08.26.07.txt"}
[2019-08-26T13:08:19,411][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: ZlmpEdElTL2NfyRvBn6ufA
[2019-08-26T13:08:20,208][WARN ][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [data_cop_test_topic] {:exception=>java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [data_cop_test_topic]}
[2019-08-26T13:08:20,224][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>1, :failures=>1, :sleep=>0.1}
[2019-08-26T13:08:20,676][WARN ][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [data_cop_test_topic] {:exception=>java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [data_cop_test_topic]}
[2019-08-26T13:08:20,676][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>1, :failures=>1, :sleep=>0.1}

Can someon please suggest if they had faced similar issue with multiple secured kafka topics?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.