How to connect Kafka with Elasticsearch?

Hi everyone, I am new in Kafka, I use kafka to collect netflow through logstash(it is ok), and I want to send the data to elasticsearch from kafka, but there are some problem.
netflow to kafka logstash config:

input{
	udp{
		host => "120.127.XXX.XX"
		port => 5556
		codec => netflow
	}
}
	filter{
		
	}
output {
  kafka {
    bootstrap_servers => "localhost:9092"    
    topic_id => "test"    
  }
  stdout{codec=> rubydebug}
}

kafka to elasticsearch logstash:

input {
      kafka { }
    }
    output {
        elasticsearch {
            hosts => ["120.127.XXX.XX:9200"]
        }
    	stdout{codec=> rubydebug}
    }

log:
D:\ELK\logstash-6.1.1\bin>logstash -f kafkatoES.conf --path.data D:\ELK\logstash-6.1.1\datatest
Sending Logstash's logs to D:/ELK/logstash-6.1.1/logs which is now configured via log4j2.properties
[2018-02-01T18:52:59,713][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"D:/ELK/logstash-6.1.1/modules/fb_apache/configuration"}
[2018-02-01T18:52:59,728][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"D:/ELK/logstash-6.1.1/modules/netflow/configuration"}
[2018-02-01T18:53:00,072][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-02-01T18:53:01,070][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.1.1"}
[2018-02-01T18:53:01,804][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9601}
[2018-02-01T18:53:09,024][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://120.127.XX.XX:9200/]}}
[2018-02-01T18:53:09,040][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://120.127.XX.XX:9200/, :path=>"/"}
[2018-02-01T18:53:09,305][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://120.127.XX.XX:9200/"}
[2018-02-01T18:53:09,383][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>nil}
[2018-02-01T18:53:09,383][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the type event field won't be used to determine the document _type {:es_version=>6}
[2018-02-01T18:53:09,415][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2018-02-01T18:53:09,430][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"default"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2018-02-01T18:53:09,493][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//120.127.XXX.XX:9200"]}
[2018-02-01T18:53:09,524][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>16, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>2000, :thread=>"#<Thread:0x45e62903 run>"}
[2018-02-01T18:53:09,609][INFO ][logstash.pipeline ] Pipeline started {"pipeline.id"=>"main"}
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/ELK/logstash-6.1.1/logstash-core/lib/org/apache/logging/log4j/log4j-slf4j-impl/2.6.2/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/ELK/logstash-6.1.1/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.2/vendor/jar-dependencies/runtime-jars/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
[2018-02-01T18:53:09,789][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
[2018-02-01T18:53:09,852][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = logstash-0
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

[2018-02-01T18:53:09,945][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 0.11.0.0
[2018-02-01T18:53:09,945][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : cb8625948210849f
[2018-02-01T18:53:10,149][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] Discovered coordinator winoc-netflow:9092 (id: 2147483647 rack: null) for group logstash.
[2018-02-01T18:53:10,164][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Revoking previously assigned partitions [] for group logstash
[2018-02-01T18:53:10,164][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (Re-)joining group logstash
[2018-02-01T18:53:10,180][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] Successfully joined group logstash with generation 6
[2018-02-01T18:53:10,180][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Setting newly assigned partitions [logstash-0] for group logstash

thank you in advance!

(I'm copying my answer from the x-post to SO)

I would suggest using Kafka Connect and its Elasticsearch sink. I actually presented on exactly this subject last night :slight_smile: Here are the slides.

You can see a detailed example here.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.