Logstash output to kafka server

Hello!

I'm running Logstash 7.0.0. It is using a filebeat input to read logs from described files and output is going to kafka. I am getting my logs correctly for some time post starting logstash. Then i get below error and no output is recieved even the small logs of less size in kafka. During this time logstash output is choked for any log (even of small size) why?

After few hours i get following error in my logstash which is given in the screen shot
the following error keeps comming until i stop logstash and start again.

NOTE : when i restart my logstash, i get output correctly for few hours, and after that same error

I want to drop this log of greater size so that logstash output shoud not get chocked.

my logstash config:

input {
beats {
port => 5044
}
}
output {
if [type] == "app_log" {
kafka {
retry_backoff_ms => 30
linger_ms => 5
topic_id => "adminapp_logs"
bootstrap_servers => "127.0.0.1:9092"
acks => "0"
codec => "json"
}
}
}

I have increased the max.request.size but it does not work.

You can use the retries option so that attempts to write to kafka are only retried a limited number of times, but note that that includes when kafka is down. You will lose data.

thnx for reply.
but how to drop big size log?

By using the retries option. By default it will retry sending that message forever, preventing any other messages from being sent.

is there any other method to drop log which is greater than specified size ?

There is no option in the kafka output to treat oversized objects differently to other errors.

You could use an ugly hack that estimates the size by converting the object to a string and checking its length. This will drop events whose string representation is more than 230 bytes...

ruby {
    code => '
        if event.to_hash.to_s.length > 230
            event.cancel
        end
    '
}

You could write additional code to better approximate the size of the serialized data, all the way up to calling org.apache.kafka.common.serialization.StringSerializer yourself, although this has the overhead of doing the serialization twice.

1 Like

getting following error when using your ruby code

[2019-09-03T15:12:15,582][ERROR][logstash.plugins.registry] Tried to load a plugin's code, but failed. {:exception=>#<LoadError: no such file to load -- logstash/outputs/ruby>, :path=>"logstash/outputs/ruby", :type=>"output", :name=>"ruby"}
[2019-09-03T15:12:15,599][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::PluginLoadingError", :message=>"Couldn't find any output plugin named 'ruby'. Are you sure this is correct? Trying to load the ruby output plugin resulted in this error: no such file to load -- logstash/outputs/ruby", :backtrace=>["/opt/logstash/logstash-core/lib/logstash/plugins/registry.rb:211:in lookup_pipeline_plugin'", "/opt/logstash/logstash-core/lib/logstash/plugin.rb:137:inlookup'", "org/logstash/plugins/PluginFactoryExt.java:200:in plugin'", "org/logstash/plugins/PluginFactoryExt.java:137:inbuildOutput'", "org/logstash/execution/JavaBasePipelineExt.java:50:in initialize'", "/opt/logstash/logstash-core/lib/logstash/java_pipeline.rb:23:ininitialize'", "/opt/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:36:in execute'", "/opt/logstash/logstash-core/lib/logstash/agent.rb:325:inblock in converge_state'"]}

It's not an output, it should be in the filter section of the configuration.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.