How to do upsert in ElasticSearch 5.3.2 using Spark Structured Streaming 2.3.0?


(Kant Kodali) #1

Hi,

I am wondering How to do upsert in ElasticSearch 5.3.2 using Spark Structured Streaming 2.3.0? other words, I have a streaming dataframe where each row is a json string now I want to write it to elastic search however I want to do an upsert if there are duplicate documents/rows in my streaming dataframe.

I am using ForWachWriter and classes from compile group: 'org.elasticsearch.client', name: 'transport', version: '5.3.2' so it looks like this

public class EsSink extends ForeachWriter<Row> {

    private TransportClient client;
    private BulkProcessor bulkProcessor;

    public EsSink(String cluster, String host, int port) throws UnknownHostException {
        Settings settings = Settings.builder()
                .put("cluster.name", cluster).build();
        String[] elasticSearchIps = host.split(",");
        InetSocketTransportAddress[] inetSocketTransportAddresses = new InetSocketTransportAddress[elasticSearchIps.length];
        for (int i = 0; i < elasticSearchIps.length; i++) {
            inetSocketTransportAddresses[i] = new InetSocketTransportAddress(InetAddress.getByName(elasticSearchIps[i]), port);
        }
        this.client = new PreBuiltTransportClient(settings)
                .addTransportAddresses(inetSocketTransportAddresses);

        this.bulkProcessor = BulkProcessor.builder(
                client,
                new BulkProcessor.Listener() {
                    @Override
                    public void beforeBulk(long executionId,
                                           BulkRequest request) {}

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          BulkResponse response) {}

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          Throwable failure) {}
                })
                .setBulkActions(10000)
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(1)
                .setBackoffPolicy(
                        BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                .build();
    }

    @Override
    public boolean open(long l, long l1) {
        return true;
    }

    @Override
    public void process(Row row) {
        String[] fieldNames = row.schema().fieldNames();
        Seq<String> fieldNamesSeq = JavaConverters.asScalaIteratorConverter(Arrays.asList(fieldNames).iterator()).asScala().toSeq();
        String jsonDocument = row.getValuesMap(fieldNamesSeq).toString();
        IndexRequest indexRequest = Requests.indexRequest("hello").type("foo").id(row.get("id")).source(jsonDocument, XContentType.JSON);
        this.bulkProcessor.add(indexRequest);
    }

    @Override
    public void close(Throwable throwable) {
        this.bulkProcessor.close();
        this.client.close();
    }
}

As you can see I am trying to do upserts based on id but I am not sure if I should use transport client or elasticsearch-hadoop in the first place ?


(James Baiera) #2

ES-Hadoop supports Spark Structured Streaming, as well as submitting bulk requests as upsert actions. Is there anything you are thinking about/needing that might not be covered by ES-Hadoop?


(Kant Kodali) #3
  1. if I want to do upsert based on one of the field in my document how would I do it in append mode?
  2. How does upsert actions are supported without supporting update mode?

(Kant Kodali) #4

I will probably answer #1 myself and please correct if I am wrong

df
  .writeStream()
  .outputMode("append")
  .trigger(Trigger.ProcessingTime(1000))
  .format("org.elasticsearch.spark.sql")
  .option("checkpointLocation", checkpoint)
  .option("es.mapping.id", "<column in the dataframe>")
  .start(this.sink.getIndex() + File.separator + this.sink.getType());

For #2 can i use forEachWriter in update mode for now (given that elastic hadoop connector doesnt support update mode)? It will be great if there is an example someone can share!


(James Baiera) #5

Apologies, I did not realize you meant UPDATE Mode. When I say that ES-Hadoop supports upserts in Structured Streaming, what I mean is that you can configure the connector to use update/upsert actions when using the APPEND mode in the stream.

Update mode from Structured Streaming is not yet supported in ES-Hadoop. It doesn't seem too hard to get there but just needs time to build out adequate testing (PR's welcome!). Update mode has more to it than just triggering an upsert to Elasticsearch for each piece of data. It also defines the approach that the stream employs for data collection - rows are accumulated in a working data set and only those rows that change are sent to the sinks to be persisted.


(system) #6

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.