Hi,
I am wondering How to do upsert in ElasticSearch 5.3.2 using Spark Structured Streaming 2.3.0? other words, I have a streaming dataframe where each row is a json string now I want to write it to elastic search however I want to do an upsert if there are duplicate documents/rows in my streaming dataframe.
I am using ForWachWriter and classes from compile group: 'org.elasticsearch.client', name: 'transport', version: '5.3.2'
so it looks like this
public class EsSink extends ForeachWriter<Row> {
private TransportClient client;
private BulkProcessor bulkProcessor;
public EsSink(String cluster, String host, int port) throws UnknownHostException {
Settings settings = Settings.builder()
.put("cluster.name", cluster).build();
String[] elasticSearchIps = host.split(",");
InetSocketTransportAddress[] inetSocketTransportAddresses = new InetSocketTransportAddress[elasticSearchIps.length];
for (int i = 0; i < elasticSearchIps.length; i++) {
inetSocketTransportAddresses[i] = new InetSocketTransportAddress(InetAddress.getByName(elasticSearchIps[i]), port);
}
this.client = new PreBuiltTransportClient(settings)
.addTransportAddresses(inetSocketTransportAddresses);
this.bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) {}
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) {}
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) {}
})
.setBulkActions(10000)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
}
@Override
public boolean open(long l, long l1) {
return true;
}
@Override
public void process(Row row) {
String[] fieldNames = row.schema().fieldNames();
Seq<String> fieldNamesSeq = JavaConverters.asScalaIteratorConverter(Arrays.asList(fieldNames).iterator()).asScala().toSeq();
String jsonDocument = row.getValuesMap(fieldNamesSeq).toString();
IndexRequest indexRequest = Requests.indexRequest("hello").type("foo").id(row.get("id")).source(jsonDocument, XContentType.JSON);
this.bulkProcessor.add(indexRequest);
}
@Override
public void close(Throwable throwable) {
this.bulkProcessor.close();
this.client.close();
}
}
As you can see I am trying to do upserts based on id but I am not sure if I should use transport client
or elasticsearch-hadoop
in the first place ?