use logstash (version 7.3) Sync kafka data to elasticsearch.this is logstash config file
input {
kafka {
codec => "json"
auto_offset_reset => "earliest"
bootstrap_servers => "kafka-ece.crcloud.com:9092"
group_id => "els-test-group"
topics => ["els-test"]
client_id => "logstash-els-test"
}
}
filter {
mutate {
add_field => { "clusterId" => "%{[fields][cluster]}" }
}
}
output {
elasticsearch {
hosts => ["10.200.201.201:9200"]
index => "%{[fields][index]}-%{+yyyy.MM.dd}"
user => "elastic"
password => "xrvWfcOk7BExb1DqciIRVess"
custom_headers => { "X-Found-Cluster"=> "%{[clusterId]}" }
}
}
use X-Found-Cluster to proxy different elasticsearch cluster, need dynamic get from input. now error message
Thread.exclusive is deprecated, use Thread::Mutex
Sending Logstash logs to /app/logstash/logs which is now configured via log4j2.properties
[2020-04-01T18:15:35,312][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-04-01T18:15:35,329][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.3.0"}
[2020-04-01T18:15:36,779][INFO ][org.reflections.Reflections] Reflections took 38 ms to scan 1 urls, producing 19 keys and 39 values
[2020-04-01T18:15:37,564][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://elastic:xxxxxx@10.200.201.201:9200/]}}
[2020-04-01T18:15:37,899][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://elastic:xxxxxx@10.200.201.201:9200/"}
[2020-04-01T18:15:37,951][ERROR][logstash.javapipeline ] Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<NoMethodError: undefined method `[]' for nil:NilClass>, :backtrace=>["/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:444:in `get_es_version'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:249:in `block in healthcheck!'", "org/jruby/RubyHash.java:1419:in `each'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:241:in `healthcheck!'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:341:in `update_urls'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:71:in `start'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/http_client.rb:302:in `build_pool'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/http_client.rb:64:in `initialize'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/http_client_builder.rb:103:in `create_http_client'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/http_client_builder.rb:99:in `build'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch.rb:238:in `build_client'", "/app/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.1.0-java/lib/logstash/outputs/elasticsearch/common.rb:25:in `register'", "org/logstash/config/ir/compiler/OutputStrategyExt.java:106:in `register'", "org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:48:in `register'", "/app/logstash-7.3.0/logstash-core/lib/logstash/java_pipeline.rb:192:in `block in register_plugins'", "org/jruby/RubyArray.java:1792:in `each'", "/app/logstash-7.3.0/logstash-core/lib/logstash/java_pipeline.rb:191:in `register_plugins'", "/app/logstash-7.3.0/logstash-core/lib/logstash/java_pipeline.rb:462:in `maybe_setup_out_plugins'", "/app/logstash-7.3.0/logstash-core/lib/logstash/java_pipeline.rb:204:in `start_workers'", "/app/logstash-7.3.0/logstash-core/lib/logstash/java_pipeline.rb:146:in `run'", "/app/logstash-7.3.0/logstash-core/lib/logstash/java_pipeline.rb:105:in `block in start'"], :thread=>"#<Thread:0x2199deaf run>"}
[2020-04-01T18:15:37,966][ERROR][logstash.agent ] Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create<main>, action_result: false", :backtrace=>nil}
[2020-04-01T18:15:38,225][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2020-04-01T18:15:43,079][INFO ][logstash.runner ] Logstash shut down.