Hello everyone,
I'm having trouble with logstash when I want to use logstash to log kafka topics here is the full output.
[2019-04-25T09:21:18,836][WARN ][org.apache.kafka.clients.ClientUtils] Couldn't resolve server localhosts:9093 from bootstrap.servers as DNS resolution failed for localhosts
[2019-04-25T09:21:18,850][ERROR][logstash.inputs.kafka ] Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer, :cause=>org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers}
[2019-04-25T09:21:18,853][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin.
Pipeline_id:main
Plugin: <LogStash::Inputs::Kafka bootstrap_servers=>"localhosts:9093", topics=>["cars"], id=>"fe6887996428551d58c1bacd44eae84a09ad59bad2652cca2a7f665bda1323af", enable_metric=>true, codec=><LogStash::Codecs::Plain id=>"plain_69d42d93-f326-4a7f-9214-b44b11bd3ad4", enable_metric=>true, charset=>"UTF-8">, auto_commit_interval_ms=>"5000", client_id=>"logstash", consumer_threads=>1, enable_auto_commit=>"true", group_id=>"logstash", key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", poll_timeout_ms=>100, ssl_endpoint_identification_algorithm=>"https", security_protocol=>"PLAINTEXT", sasl_mechanism=>"GSSAPI", decorate_events=>false>
Error: Failed to construct kafka consumer
Exception: Java::OrgApacheKafkaCommon::KafkaException
Stack: org.apache.kafka.clients.consumer.KafkaConsumer.(org/apache/kafka/clients/consumer/KafkaConsumer.java:805)
org.apache.kafka.clients.consumer.KafkaConsumer.(org/apache/kafka/clients/consumer/KafkaConsumer.java:652)
org.apache.kafka.clients.consumer.KafkaConsumer.(org/apache/kafka/clients/consumer/KafkaConsumer.java:632)
java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor)
org.jruby.javasupport.JavaConstructor.newInstanceDirect(org/jruby/javasupport/JavaConstructor.java:278)
org.jruby.RubyClass.newInstance(org/jruby/RubyClass.java:1001)
org.jruby.RubyClass$INVOKER$i$newInstance.call(org/jruby/RubyClass$INVOKER$i$newInstance.gen)
C_3a_.Users.akumas.Desktop.logstash_minus_6_dot_6_dot_0.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_input_minus_kafka_minus_8_dot_3_dot_1.lib.logstash.inputs.kafka.create_consumer(C:/Users/akumas/Desktop/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.3.1/lib/logstash/inputs/kafka.rb:328)
C_3a_.Users.akumas.Desktop.logstash_minus_6_dot_6_dot_0.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_input_minus_kafka_minus_8_dot_3_dot_1.lib.logstash.inputs.kafka.block in run(C:/Users/akumas/Desktop/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.3.1/lib/logstash/inputs/kafka.rb:226)
org.jruby.RubyEnumerable$23.call(org/jruby/RubyEnumerable.java:846)
org.jruby.RubyFixnum.times(org/jruby/RubyFixnum.java:305)
org.jruby.RubyFixnum$INVOKER$i$0$0$times.call(org/jruby/RubyFixnum$INVOKER$i$0$0$times.gen)
org.jruby.RubyClass.finvoke(org/jruby/RubyClass.java:522)
org.jruby.RubyBasicObject.callMethod(org/jruby/RubyBasicObject.java:393)
org.jruby.RubyEnumerator.each(org/jruby/RubyEnumerator.java:323)
org.jruby.RubyEnumerator$INVOKER$i$each.call(org/jruby/RubyEnumerator$INVOKER$i$each.gen)
org.jruby.RubyClass.finvoke(org/jruby/RubyClass.java:512)
org.jruby.RubyEnumerable.callEach19(org/jruby/RubyEnumerable.java:116)
org.jruby.RubyEnumerable.collectCommon(org/jruby/RubyEnumerable.java:838)
org.jruby.RubyEnumerable.map(org/jruby/RubyEnumerable.java:830)
C_3a_.Users.akumas.Desktop.logstash_minus_6_dot_6_dot_0.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_input_minus_kafka_minus_8_dot_3_dot_1.lib.logstash.inputs.kafka.run(C:/Users/akumas/Desktop/logstash-6.6.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.3.1/lib/logstash/inputs/kafka.rb:226)
C_3a_.Users.akumas.Desktop.logstash_minus_6_dot_6_dot_0.logstash_minus_core.lib.logstash.pipeline.inputworker(C:/Users/akumas/Desktop/logstash-6.6.0/logstash-core/lib/logstash/pipeline.rb:409)
C_3a_.Users.akumas.Desktop.logstash_minus_6_dot_6_dot_0.logstash_minus_core.lib.logstash.pipeline.RUBY$method$inputworker$0$VARARGS(C_3a_/Users/akumas/Desktop/logstash_minus_6_dot_6_dot_0/logstash_minus_core/lib/logstash/C:/Users/akumas/Desktop/logstash-6.6.0/logstash-core/lib/logstash/pipeline.rb)
C_3a_.Users.akumas.Desktop.logstash_minus_6_dot_6_dot_0.logstash_minus_core.lib.logstash.pipeline.block in start_input(C:/Users/akumas/Desktop/logstash-6.6.0/logstash-core/lib/logstash/pipeline.rb:403)
org.jruby.RubyProc.call(org/jruby/RubyProc.java:289)
org.jruby.RubyProc.call(org/jruby/RubyProc.java:246)
java.lang.Thread.run(java/lang/Thread)
and this is my config file;
input{
kafka{
bootstrap_servers =>"localhosts:9093"
topics => ["cars"]
}
}
filter{
csv {
separator =>","
columns => [ "maker", "model", "mileage", "manufacture_year", "engine_displacement", "engine_power", "body_type", "color_slug", "stk_year", "transmission", "door_count", "seat_count", "fuel_type", "date_created", "date_last_seen", "price_eur" ]
}
mutate {convert => ["mileage", "integer"] }
mutate {convert => ["price_eur", "float"] }
mutate {convert => ["engine_power", "integer"] }
mutate {convert => ["door_power", "integer"] }
mutate {convert => ["seat_count", "integer"] }
}
output{
elasticsearch {
hosts => "localhost"
index => "cars"
document_type=>"sold_cars"
}
stdout{}
}