Hi,
I am using the following code in pyspakr to write data into Elasticsearch from Kafka
import pyspark
from pyspark.sql import SQLContext
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import json
from kafka import KafkaConsumer, KafkaClient
from pyspark.sql.functions import explode, split
import pandas as pd
from collections import OrderedDict
from datetime import date
conf = pyspark.SparkConf()
conf.setMaster('mesos://172.20.1.157:5050')
sc = pyspark.SparkContext(conf=conf)
sqlContext = SQLContext(sc)
ssc = StreamingContext(sc, 2)
kafkaParams = {'metadata.broker.list': '172.20.1.163:9092', 'auto.offset.reset': 'smallest'} # kafka parameters
topics = ['json_topic']
kafka_stream = KafkaUtils.createDirectStream(ssc,topics,kafkaParams)
parsed = kafka_stream.map(lambda (k,v) : json.loads(v))
def function(x):
y = x.collect()
for d in y :
print d
rdd_json = json.dumps(d)
print rdd_json
rdd_json.write.format('org.elasticsearch.spark.sql').mode('append').option('es.index.auto.create','true').option('es.resource', 'sql6/swati').save()
parsed.foreachRDD(lambda x :function(x))
ssc.start()
ssc.awaittermination()
ssc.stop()
The output of rdd_json comes as below :-
{"Enrolment_Date": "2008-01-01", "Freq": 78, "Group": "Recorded Data"}
But I am not able to write it my elasticsearch cluster using the following code :-
rdd_json.write.format('org.elasticsearch.spark.sql').mode('append').option('es.index.auto.create','true').option('es.resource', 'sql6/type').save()
Any ideas as to how to write it into elasticsearch or is any change in the code is required?
Thanks in advance