MSK to Elasticksearch using logstash

I have create MSK in AWS also Elastic search cluster hostes in AWS.
I am trying to read data from topic in MSK and send this data to elasticsearch index.

input {
  kafka {
    bootstrap_servers => "x:9096"
    topics => ["y"]
    security_protocol => "SASL_SSL"
    sasl_mechanism => "SCRAM-SHA-512"
    jaas_path => "/home/ec2-user/jaas_path.conf"  # Provide the path to your jaas.conf file
    codec => json
    group_id => "logstash_consumer_group"
    auto_offset_reset => "earliest"
  }
}



output {
  elasticsearch {
    index => "sit1" # Replace with your desired index name patter
    hosts =>"https://zzzz:443"
    user => "${ELASTIC_USERNAME}"
    password => "${ELASTIC_PASSWORD}"
   }
  stdout { codec => rubydebug }
}

this is my configuration in logstash. I can telent my es cluster. But i am getting this error:

[ERROR] 2023-12-26 11:04:20.788 [Ruby-0-Thread-5: :1] elasticsearch - Failed to install template. {:message=>"Got response code '401' contacting Elasticsearch at URL 'https://asdgsdf:443/_xpack'", :class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError", :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb:80:in `perform_request'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:332:in `perform_request_to_url'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:319:in `block in perform_request'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:414:in `with_connection'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:318:in `perform_request'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:326:in `block in Pool'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client.rb:162:in `get'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client.rb:382:in `get_xpack_info'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/ilm.rb:57:in `ilm_ready?'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/ilm.rb:28:in `ilm_in_use?'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/template_manager.rb:15:in `install_template'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/common.rb:218:in `install_template'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/common.rb:49:in `block in setup_after_successful_connection'"]}
[INFO ] 2023-12-26 11:04:20.790 [[main]-pipeline-manager] javapipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>125, "pipeline.sources"=>["/etc/logstash/conf.d/topic-sit.conf"], :thread=>"#<Thread:0x6a76847c run>"}
warning: thread "Ruby-0-Thread-5: :1" terminated with exception (report_on_exception is true):
LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError: Got response code '401' contacting Elasticsearch at URL 'https://asdgsdf:443/_xpack'
                    perform_request at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb:80
             perform_request_to_url at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:332
                    perform_request at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:319
                    with_connection at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:414
                    perform_request at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:318
                               Pool at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:326
                                get at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client.rb:162
                     get_xpack_info at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client.rb:382
                         ilm_ready? at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/ilm.rb:57
                        ilm_in_use? at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/ilm.rb:28
  setup_after_successful_connection at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/common.rb:50
[FATAL] 2023-12-26 11:04:20.998 [LogStash::Runner] runner - An unexpected error occurred! {:error=>#<LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError: LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb:80:in `perform_request'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:332:in `perform_request_to_url'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:319:in `block in perform_request'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:414:in `with_connection'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:318:in `perform_request'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:326:in `block in Pool'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client.rb:162:in `get'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/http_client.rb:382:in `get_xpack_info'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/ilm.rb:57:in `ilm_ready?'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/ilm.rb:28:in `ilm_in_use?'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.7.3-java/lib/logstash/outputs/elasticsearch/common.rb:50:in `block in setup_after_successful_connection'"]}

You need to check your credentials, error 401 means Unauthorized

Also, is this self-hosted or are you using the AWS Elasticsearch service?

with the sam credentials curl works.
Yes i am using AWS Elasticsearch service
My es and logstash have 7.10.2 version

AWS Elasticsearch service is not really Elasticsearch, it is a fork by Amazon with some differences, I'm not sure if you can use the elasticsearch output in this case, maybe you will need to install the opensearch output and use it.

But as mentioned from the error you shared, your output is returning a 401 and you need to confirm your credentials, but since this is not Elasticsearch, you may also check with some AWS forum about this issue.

OpenSearch/OpenDistro are AWS run products and differ from the original Elasticsearch and Kibana products that Elastic builds and maintains. You may need to contact them directly for further assistance.

(This is an automated response from your friendly Elastic bot. Please report this post if you have any suggestions or concerns :elasticheart: )

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.