Use logstash Sync kafka data to elasticsearch

use logstash (version 7.3) Sync kafka data to elasticsearch.this is logstash config file

input {
  kafka {
    codec => "json"
    auto_offset_reset => "earliest"
    bootstrap_servers => "kafka-ece.crcloud.com:9092"
    group_id => "els-test-group"
    topics => ["els-test"]
    client_id => "logstash-els-test"
  }
}

filter {
mutate {
        add_field => { "clusterId" => "%{[fields][cluster]}" }
    }
}

output {
  elasticsearch {
    hosts => ["10.200.201.201:9200"]
    index => "%{[fields][index]}-%{+yyyy.MM.dd}"
    user => "elastic"
    password => "xrvWfcOk7BExb1DqciIRVess"
    custom_headers => { "X-Found-Cluster"=> "%{[clusterId]}" }
  }
}

use X-Found-Cluster to proxy different elasticsearch cluster, need dynamic get from input. now error message

Thread.exclusive is deprecated, use Thread::Mutex
Sending Logstash logs to /app/logstash/logs which is now configured via log4j2.properties
[2020-04-01T18:15:35,312][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-04-01T18:15:35,329][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.3.0"}
[2020-04-01T18:15:36,779][INFO ][org.reflections.Reflections] Reflections took 38 ms to scan 1 urls, producing 19 keys and 39 values 
[2020-04-01T18:15:37,564][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://elastic:xxxxxx@10.200.201.201:9200/]}}
[2020-04-01T18:15:37,899][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://elastic:xxxxxx@10.200.201.201:9200/"}
[2020-04-01T18:15:37,951][ERROR][logstash.javapipeline    ] Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<NoMethodError: undefined method `[]' for nil:NilClass>, :backtrace=>["/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:444:in `get_es_version'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:249:in `block in healthcheck!'", "org/jruby/RubyHash.java:1419:in `each'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:241:in `healthcheck!'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:341:in `update_urls'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:71:in `start'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/http_client.rb:302:in `build_pool'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/http_client.rb:64:in `initialize'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/http_client_builder.rb:103:in `create_http_client'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/http_client_builder.rb:99:in `build'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch.rb:238:in `build_client'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/common.rb:25:in `register'", "org/logstash/config/ir/compiler/OutputStrategyExt.java:106:in `register'", "org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:48:in `register'", "/app/logstash-7.3.0/logstash-core/lib/logstash/java_pipeline.rb:192:in `block in register_plugins'", "org/jruby/RubyArray.java:1792:in `each'", "/app/logstash-7.3.0/logstash-core/lib/logstash/java_pipeline.rb:191:in `register_plugins'", "/app/logstash-7.3.0/logstash-core/lib/logstash/java_pipeline.rb:462:in `maybe_setup_out_plugins'", "/app/logstash-7.3.0/logstash-core/lib/logstash/java_pipeline.rb:204:in `start_workers'", "/app/logstash-7.3.0/logstash-core/lib/logstash/java_pipeline.rb:146:in `run'", "/app/logstash-7.3.0/logstash-core/lib/logstash/java_pipeline.rb:105:in `block in start'"], :thread=>"#<Thread:0x2199deaf run>"}
[2020-04-01T18:15:37,966][ERROR][logstash.agent           ] Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create<main>, action_result: false", :backtrace=>nil}
[2020-04-01T18:15:38,225][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2020-04-01T18:15:43,079][INFO ][logstash.runner          ] Logstash shut down.

The ES cluster is not returning the expected JSON structure. The code in question is here.

Because it hasn't reached the cluster, the 10.200.201.201 is proxy server, it through the header(X-Found-Cluster) to find the cluster. Now,cann't get the header, the proxy server don't known which cluster

I do not think sending data to multiple clusters like this will work as each bulk request potentially could contain events for different clusters but the headers (do not know if dynamic headers are supported) are set for the full request and not per event. Instead create a separate output plugin per cluster and use conditionals.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.