Pyspark write to Elasticsearch from Kafka


I am using the following code in pyspakr to write data into Elasticsearch from Kafka

import pyspark
from pyspark.sql import SQLContext
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import json
from kafka import KafkaConsumer, KafkaClient
from pyspark.sql.functions import explode, split
import pandas as pd
from collections import OrderedDict
from datetime import date

conf = pyspark.SparkConf()
sc = pyspark.SparkContext(conf=conf)
sqlContext = SQLContext(sc)
ssc = StreamingContext(sc, 2)
kafkaParams = {'': '', 'auto.offset.reset': 'smallest'} # kafka parameters
topics = ['json_topic']
kafka_stream = KafkaUtils.createDirectStream(ssc,topics,kafkaParams)
parsed = (k,v) : json.loads(v))
def function(x):
y = x.collect()
for d in y :
print d
rdd_json = json.dumps(d)
print rdd_json
rdd_json.write.format('org.elasticsearch.spark.sql').mode('append').option('','true').option('es.resource', 'sql6/swati').save()

parsed.foreachRDD(lambda x :function(x))

The output of rdd_json comes as below :-
{"Enrolment_Date": "2008-01-01", "Freq": 78, "Group": "Recorded Data"}

But I am not able to write it my elasticsearch cluster using the following code :-

rdd_json.write.format('org.elasticsearch.spark.sql').mode('append').option('','true').option('es.resource', 'sql6/type').save()

Any ideas as to how to write it into elasticsearch or is any change in the code is required?

Thanks in advance

@newbie_here Could you include some sort of error trace that you are seeing, or does the command fail with no feedback?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.