Hadoop-ES Structured Streaming-write Sink Partition Issue


(grifith) #1

Hello,

We have a structured streaming application which write the data from Spark to ES.The flow is follows-

Kinesis Stream( 8 shards)---->(Databricks Library for Kinesis-readstream)---->Spark structured Stream(transformation) ----->(es hadoop library 6.2.1-writestream)--->ES

When we use the es-hadoop library for the ES write Sink, we have seen that the data frame repartition is not happening.When we try the write stream with console ,the actual shuffle is happening .

The number of tasks in the stage1 of the processing is always equal to the kinesis stream shard number.We have tried different ES configurations and its not working as expected.

Code flow is as below:-

##Read stream(data bricks Library for Kinesis)#######################

var result = spark.readStream
.format("kinesis")
.option("streamName", conf.aws.kinesisStream)
.option("region", "eu-west-1")
.option("awsAccessKey", conf.aws.accessKey)
.option("awsSecretKey", conf.aws.secretKey)
.option("initialPosition", conf.aws.kinesisPosition)
.option("maxRecordsPerFetch", conf.databricks.maxRecordsPerFetch)
.option("fetchBufferSize", conf.databricks.fetchBufferSize)
.option("maxFetchDuration", conf.databricks.maxFetchDuration)
.option("maxFetchRate", conf.databricks.maxFetchRate)
.option("minFetchPeriod", conf.databricks.minFetchPeriod)
.option("shardFetchInterval", conf.databricks.shardFetchInterval)
.option("shardsPerTask", conf.databricks.shardsPerTask)

######repartition after the stream read#######################
val mStream = stream
.repartition(conf.spark.inPartitions.toInt)

###Data Transformations#############

####Write Sink###############################

eventsStream
.writeStream
.outputMode(OutputMode.Append) //Only mode for ES
.format("org.elasticsearch.spark.sql") //es
.option("es.nodes", conf.es.nodes)
.option("es.nodes.wan.only", conf.es.wanOnly)
.queryName("name")
.option("checkpointLocation", streamCheckPointLoc)
.option("es.batch.size.entries", conf.es.es_batch_size_entries)
.option("es.batch.size.bytes", conf.es.es_batch_size_bytes)
.option("es.batch.write.refresh", conf.es.es_batch_write_refresh)
.option("es.input.max.docs.per.partition", conf.es.es_max_docs_per_partition)
.option("es.batch.write.retry.count", conf.es.es_batch_write_retry_count)
.start(conf.es.index)

Could you please let us know why the repartition is not happening when we use the es-hadoop library? for other built-in output sinks the repartitioning and shuffle operation is happening.


(system) #2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.