Logstash write to kafka's nonexistent topic, but no errors/warns in debug log

Logstash writes event to kafka, but I forgot to create topic on kafka. And then I check logstash logs, but I can't find any error or warn messages in logs. Is this a bug of logstash 5.5 that logstash isn't logging
the write to kafka error issue? Does newer version of logstash already fix this bug(like 6.8 or 7.7)?

logstash 5.5

Logstash config:

    file {
        path => ["/tmp/test.log"]
        add_field => { "log_group"=>"game-shqz-monitoring" }
        add_field => { "type"=>"game-shqz-monitoring-direct" }

filter {
    json {
        source => "message"

output {
kafka {
    codec => json
    bootstrap_servers => "****"
    topic_id => "notexist"
    sasl_mechanism => "ONS"
    jaas_path => "/etc/logstash/jaas.conf"
    ssl_truststore_password => "KafkaOnsClient"
    ssl_truststore_location => "/etc/logstash/kafka.client.truststore.jks"

Debug Log

[2021-10-08T14:55:35,739][DEBUG][logstash.inputs.file     ] writing sincedb (delta since last write = 1633676135)
[2021-10-08T14:55:35,780][DEBUG][logstash.pipeline        ] filter received {"event"=>{"path"=>"/tmp/test.log", "@timestamp"=>2021-10-08T06:55:35.598Z, "log_group"=>"game-shqz-monitoring", "@version"=>"1", "host"=>"localhost", "message"=>"{\"test\":\"abc\"}", "type"=>"game-shqz-monitoring-direct"}}
[2021-10-08T14:55:35,783][DEBUG][logstash.filters.json    ] Running json filter {:event=>2021-10-08T06:55:35.598Z localhost {"test":"abc"}}
[2021-10-08T14:55:35,958][DEBUG][logstash.filters.json    ] Event after json filter {:event=>2021-10-08T06:55:35.598Z localhost {"test":"abc"}}
[2021-10-08T14:55:35,961][DEBUG][logstash.pipeline        ] output received {"event"=>{"path"=>"/tmp/test.log", "@timestamp"=>2021-10-08T06:55:35.598Z, "log_group"=>"game-shqz-monitoring", "test"=>"abc", "@version"=>"1", "host"=>"localhost", "message"=>"{\"test\":\"abc\"}", "type"=>"game-shqz-monitoring-direct"}}
[2021-10-08T14:55:40,566][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2021-10-08T14:55:45,566][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2021-10-08T14:55:49,751][DEBUG][logstash.inputs.file     ] _globbed_files: /tmp/test.log: glob is: ["/tmp/test.log"]

Source Code Review:
I find that there is no callback argumnet specify for @producer.send method, since send method is a async method.How does logstash know async send is ok without callback?

There is no bug, this is the expected behavior.

You do not need to create the topic before sending messages, per default Kafka will create the topic when a producer send a message to a non-existent topic, the logstash kafka output is a producer, so when it sends a message, the topic will be created.

Also, Logstash 5.5 is too old and a lot of things changed, the code you shared is for the last version of the kafka output, it is not the same one used by the output in version 5.5.

Thanks for your answer~
But I check my Kafka setting, and find that auto.create.topics.enable is set to false, so the topic shouldn't be created if kafka received message to the non-existent topic from producer.

And also I compare 5.x version and the latest version code that metion above, @producer.send(record) line remains the same

Oh I see, with auto.create.topics.enable set to false the topic shouldn't be created.

But again, this does not seem as a Logstash issue, the Logstash Kafka output is just a producer, it uses the KafkaProducer java API to send messages to the topics, it is Kafka that will deal with the request and send the error back to Logstash.

If even with auto.create.topics.enable set to false and the topic not existing in the broker, logstash is still able to send messages to this topic, then something is probably misconfigured on Kafka's side as Logstash does not seem to use the Admin API to create topics, at least I didn't fid anything about it in the documentation/code.

Can you test this again and get the logs when logstash is starting and connecting to Kafka and the logs from the kafka server?

Also, which version of Kafka are you using and how many brokers do you have? If you have more than one, are all of them with the same config?

You mean there may be error/warn in logstash log already during logstash starting and connecting to Kafka procedure(Does Logstash exam kafka topic existence when it starts?) instead of later sending message to kafka procedure?

The topic_id can be sprintf'd from a field on the event, so the output cannot possibly know when it is initialized what topics it will be asked to write to in the future.

kafka version is 0.10.2 and 1 broker(production env we deploy 3 with same config)

Agree, but the problem is why does logshash not record the error msg from kafka to its own log file :rofl:?

Now I'm confused, is Logstash writing into the topic or not? If it is writing into the topic, then there is no error to be show.

Since you said that logstash writes to the topic, I'm assuming that the issue could be in Kafka, if it cannot write and it is also is not logging any error, then could be a bug, and if this is a bug it could be already be solved in latter versions as 5.5 is pretty old.

Is logstash writing or not to the topic? Can you replicate your pipeline with a newer version to see what happens?

the phrase "writing to topic" I mean "sending to topic". Logstash just sends message to kafka topic, but doesn't know if the message is writing to the non-existent topic sucessfully.

OK, I will test it later

Does logstash use the Kafka Admin API to create non-existent topic internally(Or I have to specified it in logstash output kafka config?) even if
kafka isn't enabled auto create topics?

According to the code of the output plugin it uses only the Producer API.

Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on
the broker.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.