I've tried to bulk index a bunch of JSON raw records into ES, and I needed to set custom _id values for them. Individual indexing works by calling "IndexQueryBuilder().withId(some_id_value)" and then calling the individual index method, but calling the "bulkIndex" method doesn't consider what was defined as the _id desired value.
Here's the code that ignores the ".withId" call:
package <ommited for safety>;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;
import org.springframework.stereotype.Service;
@Service
public class ESService {
@Autowired
private ElasticsearchOperations esOperations;
public void index(String baseName, Map<Integer, String> jsonDocuments, String indexName, Long exp_time) {
IndexCoordinates indexCoordinates = IndexCoordinates.of(indexName);
IndexOperations indexOps = esOperations.indexOps(indexCoordinates);
if(!indexOps.exists()) {
indexOps.create();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
List<IndexQuery> indexQueries = jsonDocuments.keySet().stream()
.map(id -> new IndexQueryBuilder()
.withSource(jsonDocuments.get(id))
.withId(id.toString()) // HERE IS THE IGNORED CALL
.withIndex(indexName)
.build())
.collect(Collectors.toList());
try {
esOperations.bulkIndex(indexQueries, indexCoordinates);
} catch (Exception e) {
e.printStackTrace();
}
}
}
It should be interesting (if not mandatory) that the user could set the _id for each individual record sent in the bulk request.
dadoonet
(David Pilato)
August 11, 2024, 2:56pm
2
I think you are doing something wrong in the ElasticsearchOperations class.
Also what is the elastic client version?
High level Client - 7.17.3 (backward compatibility with server 8.10.3)
Springboot: 3.0.13
spring-data-elasticsearch - 5.0.12
Below is configuration
package com.example.elasticdemo;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.sniff.SniffOnFailureListener;
import org.elasticsearch.client.sniff.Sniffer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.elc.ElasticsearchConfiguration;
import org.springframework.data.elasticsearch.client.erhlc.RestClients;
import org.springframework.data.elasticsearch.support.HttpHeaders;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
@Configuration
public class ElasticsearchConfig1 extends ElasticsearchConfiguration {
@Override
public ClientConfiguration clientConfiguration() {
HttpHeaders compatibilityHeaders = new HttpHeaders();
compatibilityHeaders.add("Accept", "application/vnd.elasticsearch+json;compatible-with=7");
compatibilityHeaders.add("Content-Type", "application/vnd.elasticsearch+json;"
+ "compatible-with=7");
return ClientConfiguration.builder()
.connectedTo("host")
.withBasicAuth("username", "password")
.withDefaultHeaders(compatibilityHeaders)
//.withHeaders(() -> compatibilityHeaders)
.withConnectTimeout(Duration.ofMillis(5000))
.withSocketTimeout(Duration.ofMillis(5000))
.build();
}
@Bean
//@Override
public RestHighLevelClient restHighLevelClient() {
RestHighLevelClient restHighLevelClient = RestClients.create(clientConfiguration()).rest();
//Sniffer
LogSniffOnFailureListener sniffOnFailureListener = new LogSniffOnFailureListener();
Sniffer sniffer = Sniffer.builder(restHighLevelClient.getLowLevelClient()).setSniffAfterFailureDelayMillis(Math.toIntExact(TimeUnit.MINUTES.toMillis(1))).build();
sniffOnFailureListener.setSniffer(sniffer);
return restHighLevelClient;
}
public class LogSniffOnFailureListener extends SniffOnFailureListener {
public LogSniffOnFailureListener() {
super();
}
@Override
public void onFailure(org.elasticsearch.client.Node node) {
//LOGGER.warn("Node failed: " + node.getName());
System.out.println("error----------------------------");
super.onFailure(node);
}
}
}
dadoonet
(David Pilato)
August 12, 2024, 2:34am
4
You probably need to ask the spring data project as the bulk code you are using is not provided by Elastic.
That said, I'd use the official Java API client instead of the old HLClient. I did not check if the spring data team switched to it though. I guess so.