I am trying to divert output to different secured kafka topics based on the topic name received from HTTP end point.
for single topic my configuration is working fine. If I add if else based on topic name condition intermittently I keep on getting below error.
working cofig
input {
http {
host => "127.0.0.1" # default: 0.0.0.0
port => "9600" # default: 8080
codec => "json"
}
}
filter
{
mutate { add_field => { "[@metadata][topic1]" => "%{[headers][topic]}"} }
mutate { add_field => { "[@metadata][kafka_key]" => "%{[headers][kafka_key]}"} }
mutate {remove_field => [headers] }
}
output {
if [@metadata][topic1] == "data_cop_test_topic" {
kafka {
bootstrap_servers => "xxx"
topic_id => "%{[@metadata][topic1]}"
jaas_path => ["C:\kafka_kerberos\jass1.conf"]
kerberos_config => ["C:\kafka_kerberos\krb5.conf"]
sasl_kerberos_service_name => "kafka"
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "GSSAPI"
message_key => "%{[@metadata][kafka_key]}"
codec => json{}
}
}
}
In the output if I add something like
output {
if [@metadata][topic1] == "data_cop_test_topic" {
kafka {
bootstrap_servers => "xxx"
topic_id => "%{[@metadata][topic1]}"
jaas_path => ["C:\kafka_kerberos\jass1.conf"]
kerberos_config => ["C:\kafka_kerberos\krb5.conf"]
sasl_kerberos_service_name => "kafka"
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "GSSAPI"
message_key => "%{[@metadata][kafka_key]}"
codec => json{}
}
}
else if [@metadata][topic1] == "CDO_TestTopic1" {
kafka {
bootstrap_servers => "xxx"
topic_id => "%{[@metadata][topic1]}"
jaas_path => ["C:\kafka_kerberos\jass2.conf"]
kerberos_config => ["C:\kafka_kerberos\krb5.conf"]
sasl_kerberos_service_name => "kafka"
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "GSSAPI"
message_key => "%{[@metadata][kafka_key]}"
codec => json{}
}
}
}
I start getting below error.
[2019-08-26T13:07:39,239][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 2.1.0
[2019-08-26T13:07:39,239][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : eec43959745f444f
[2019-08-26T13:07:39,316][INFO ][logstash.javapipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>3, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>375, :thread=>"#<Thread:0x49fef6e8 run>"}
[2019-08-26T13:07:39,973][INFO ][logstash.javapipeline ] Pipeline started {"pipeline.id"=>"main"}
[2019-08-26T13:07:40,020][INFO ][logstash.inputs.http ] Starting http input listener {:address=>"127.0.0.1:9600", :ssl=>"false"}
[2019-08-26T13:07:40,083][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>}
[2019-08-26T13:07:40,958][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9601}
[2019-08-26T13:08:17,913][INFO ][logstash.outputs.file ] Opening file {:path=>"C:/logstash-7.1.1/raw/2019.08.26.07.txt"}
[2019-08-26T13:08:19,411][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: ZlmpEdElTL2NfyRvBn6ufA
[2019-08-26T13:08:20,208][WARN ][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [data_cop_test_topic] {:exception=>java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [data_cop_test_topic]}
[2019-08-26T13:08:20,224][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>1, :failures=>1, :sleep=>0.1}
[2019-08-26T13:08:20,676][WARN ][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [data_cop_test_topic] {:exception=>java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [data_cop_test_topic]}
[2019-08-26T13:08:20,676][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>1, :failures=>1, :sleep=>0.1}
Can someon please suggest if they had faced similar issue with multiple secured kafka topics?