Logstash not sending data to elastic from kafka

Hi,

im trying to add data in elastic with logstash and data is input from kafka
this is the simple config im using on logstash server in conf file /etc/logstash/conf.d/1-config.conf

input{
kafka{
bootstrap_servers => "dummykafka:9092"
topics => ["dummy.kafka.topic"]
}
}
output{
elasticsearch{
index => "test.winlogbeat.eventlogs-%{+YYYY.MM.dd}"
hosts => ["elasticdatanode:50000"]
user => "AdminID"
password => "dummypassword"
}
}

i restart the logstash server with this command sudo initctl restart logstash
after restart of logstash data should be added to elastic, but it doesnt. data is present in kafka

Can you please help? or how do i enable logging in logstash?

So i found the logstash logs. But i don't understand what this error means.

[2017-03-16T09:46:43,950][ERROR][logstash.agent ] Pipeline aborted due to error {:exception=>#<URI::InvalidComponentError: bad component(expected user component): ?l&DATa_tknFsIr>, :backtrace=>["/usr/share/logstash/vendor/jruby/lib/ruby/1.9/uri/generic.rb:440:in check_password'", "/usr/share/logstash/vendor/jruby/lib/ruby/1.9/uri/generic.rb:512:inpassword='", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-5.4.0-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:246:in normalize_url'", "org/jruby/RubyArray.java:2414:inmap'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-5.4.0-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:256:in update_urls'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-5.4.0-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:66:ininitialize'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-5.4.0-java/lib/logstash/outputs/elasticsearch/http_client.rb:183:in build_pool'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-5.4.0-java/lib/logstash/outputs/elasticsearch/http_client.rb:35:ininitialize'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-5.4.0-java/lib/logstash/outputs/elasticsearch/http_client_builder.rb:57:in build'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-5.4.0-java/lib/logstash/outputs/elasticsearch.rb:196:inbuild_client'", "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-5.4.0-java/lib/logstash/outputs/elasticsearch/common.rb:13:in register'", "/usr/share/logstash/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:8:inregister'", "/usr/share/logstash/logstash-core/lib/logstash/output_delegator.rb:37:in register'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:229:instart_workers'", "org/jruby/RubyArray.java:1613:in each'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:229:instart_workers'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:183:in run'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:292:instart_pipeline'"]}

Hey i found the Answer to this in another post

the password needs to be encoded.

Thanks :slight_smile:

But now i have another issue.
Logstash logs (below) show no error after i restart logstash, but it does not create indexes in elastic. The config is the same as above.

Is there something I'm missing? Do we have to create some Templates in elastic for indexes or is it created by default by Logstash??

[2017-03-16T10:39:49,975][WARN ][logstash.runner ] SIGTERM received. Shutting down the agent.
[2017-03-16T10:39:49,987][WARN ][logstash.agent ] stopping pipeline {:id=>"main"}
[2017-03-16T10:40:12,925][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [dummykafka:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = logstash
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = logstash
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = latest

[2017-03-16T10:40:13,017][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 0.10.0.1
[2017-03-16T10:40:13,017][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : a7a17cere4fda6c5
[2017-03-16T10:40:13,200][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] Discovered coordinator dummykafka:9092 (id: 2147483645 rack: null) for group logstash.
[2017-03-16T10:40:13,209][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Revoking previously assigned partitions [] for group logstash
[2017-03-16T10:40:13,210][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (Re-)joining group logstash
[2017-03-16T10:40:13,226][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] Successfully joined group logstash with generation 1
[2017-03-16T10:40:13,227][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Setting newly assigned partitions [test.kafka.topic] for group logstash
[2017-03-16T10:40:13,327][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>["http://~hidden~:~hidden~@elasticnode:50000"]}}
[2017-03-16T10:40:13,329][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:url=>#<URI::HTTP:0xb6331bc URL:http://~hidden~:~hidden~@elasticnode:50000>, :healthcheck_path=>"/"}
[2017-03-16T10:40:13,566][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<URI::HTTP:0xb6331bc URL:http://~hidden~:~hidden~@elasticnode:50000>}
[2017-03-16T10:40:13,572][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-03-16T10:40:13,693][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"default"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword"}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-03-16T10:40:13,714][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["elasticnode:50000"]}
[2017-03-16T10:40:13,721][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-03-16T10:40:13,785][INFO ][logstash.pipeline ] Pipeline main started
[2017-03-16T10:40:13,884][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.