Kafka to ElasticSearch


(ES_MacTalk) #1

Hi,
We are trying to move data from a Kafka Topic to ElasticSearch.We are getting data in JSON format in the Kafka Topic.We are planning to move this data to Elastic Search and then finally visualize using kibana. We are right now using Flume Elastic search sink using the default serializer for the same.But we are not able to visualize the data in Kibana..

In the elastic search ,the
{
"_index": "handleindex6-2014-12-01",
"_type": "bar_type",
"_id": "AUoGQAHfOA15NKJPcqsy",
"_score": 1,
"_source": {
"@message": "{"handle":"speed","mac":"WIN1223","owner":"Sam","schema ID":"12B007","data":"19.66","timestamp":1366150981}",
"@timestamp": "2014-12-01T14:27:45.509Z",
"@fields": {
"topic": "mtalk",
"timestamp": "1417444065509"
}
We are seeing that the data is getting embedded withing the message part of the json saved in elastic search.Is this the right approach Kafka-->Flume-->ES-->Kibana?


(Mungeol Heo) #2

There is another approach which is listed below.
kafka -> logstash consumer (which is logstash-kafka) -> es
you can check logstash-kafka at "https://github.com/joekiller/logstash-kafka"
And, I tried the approach which you mentioned above.
However, for me, es sink of flume is kind of unstable.
Anyway, you can try it.
Hope it helps.

On Tue, Dec 2, 2014 at 3:25 AM, ES_MacTalk baluvignesh@gmail.com wrote:

Hi,
We are trying to move data from a Kafka Topic to ElasticSearch.We are
getting data in JSON format in the Kafka Topic.We are planning to move this
data to Elastic Search and then finally visualize using kibana. We are right
now using Flume Elastic search sink using the default serializer for the
same.But we are not able to visualize the data in Kibana..

In the elastic search ,the
{
"_index": "handleindex6-2014-12-01",
"_type": "bar_type",
"_id": "AUoGQAHfOA15NKJPcqsy",
"_score": 1,
"_source": {
"@message":
"{"handle":"speed","mac":"WIN1223","owner":"Sam","schema
ID":"12B007","data":"19.66","timestamp":1366150981}",

"@timestamp": "2014-12-01T14:27:45.509Z",
"@fields": {
"topic": "mtalk",
"timestamp": "1417444065509"
}
We are seeing that the data is getting embedded withing the message part of
the json saved in elastic search.Is this the right approach
Kafka-->Flume-->ES-->Kibana?

--
View this message in context: http://elasticsearch-users.115913.n3.nabble.com/Kafka-to-ElasticSearch-tp4066982.html
Sent from the ElasticSearch Users mailing list archive at Nabble.com.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/1417458311296-4066982.post%40n3.nabble.com.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CADQPeWz%2B7YtaL9rSFqsKfs-dvVDte0RMp%2BnFyu1Q%3Do%2BCUVwjqw%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.


(sangala.hadoop@gmail.com) #3

I am also trying kafka--> flume --> elassticserch. but i am not able to do with below configuration.
Please share flume configuration.

flume1.sources = source1
flume1.channels = channel1
flume1.sinks = elasticsearch

flume1.channels.channel1.type = memory
flume1.channels.channel1.capacity = 10000000
flume1.channels.channel1.transactionCapacity = 1000

For each source, channel, and sink, set standard properties

flume1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
flume1.sources.source1.zookeeperConnect = hosthere:2181
flume1.sources.source1.topic = kafka_topicname_here
flume1.sources.source1.batchSize = 5
flume1.sources.source1.batchDurationMillis = 200
flume1.sources.source1.channels = channel1

flume1.sinks.elasticsearch.channel = channel1
flume1.sinks.elasticsearch.type=elasticsearch
flume1.sinks.elasticsearch.batchSize=100
flume1.sinks.elasticsearch.hostNames = elastic_host_here:9300
flume1.sinks.elasticsearch.indexName = foo_index
flume1.sinks.elasticsearch.indexType = bar_type
flume1.sinks.elasticsearch.ttl = 2d
flume1.sinks.elasticsearch.clusterName = myclustername_here
flume1.sinks.elasticsearch.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer


(sangala.hadoop@gmail.com) #4

I fixed ...and now working fine

thanks,
Shekhar Reddy.


(sangala.hadoop@gmail.com) #5

Hi Mac,

I am using org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer in flume config and now kafka topic data coming in body. please share details if you fixed this issue

flume1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer

{
_index: foo_index-2015-05-25
_type: bar_type
_id: AU2L8-LiEl9820ruvxyR
_version: 1
_score: 1
_source: {
body: {customer: customer1,network: network1, domain: XXX, kpiGroup:kpiGroup1 ,vendor: vendor1,entityType: entityType1}
timestamp: 1432572122700
topic: topic_name
}
}


(system) #6