I followed this thread but not able to resolve this issue I have a rest high level client and Elasticsearch 7.2 I am trying to bulk insert and keep getting this error -"java.lang.IllegalStateException: Request cannot be executed; I/O reactor status: STOPPED". Could anyone please help in getting me out of this?
Here I am pasting the code snippet of my actual code ...
public class ElasticSearchLoader
extends BaseLoader
{
private RestHighLevelClient client;
private BulkProcessor bulkProcessor;
private HashMap<String, ElasticFieldMapper> fieldMappers = new HashMap<String, ElasticFieldMapper>();
@Override
protected void performBegin() throws InitializeException
{
super.performBegin();
client = ElasticSearchHelper
.makeConnection(clusterName.stringValue(), server.stringValue(), port.intValue());
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
for (BulkItemResponse resp : response.getItems()) {
if (resp.isFailed()) {
System.out.println(resp.getFailureMessage());
getResults().reportProblem(
"Elastic Search Load Error",
Severity.Error,
resp.getFailureMessage(),
null);
}
}
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
context.log().warn("Error loading bulk elastic search requests", failure);
}
@Override
public void beforeBulk(long executionId, BulkRequest request) {
context.log().info("Creating bulk processor.");
context.log().info("Number of actions: " + request.numberOfActions());
}
};
bulkProcessor = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener).build();
BulkProcessor.Builder builder = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener);
builder.setBulkActions(bulkActions.intValue());
builder.setBulkSize(new ByteSizeValue(bulkSize.intValue(), ByteSizeUnit.MB));
builder.setFlushInterval(TimeValue.timeValueSeconds(flushSeconds.intValue()));
builder.setConcurrentRequests(concurrentRequests.intValue()).build();
builder.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
setFieldMappers();
// if (!indexExists()) {
// setESFieldMapping();
// }
context.log().info(
"Loading elastic search; server: "
+ server.stringValue()
+ ", cluster: "
+ clusterName.stringValue()
+ ", index: "
+ docIndex.stringValue()
+ ", type: "
+ docType.stringValue());
}
@Override
public void performLoad(Record item) throws InvalidRecordException, ETLSystemException
{
try {
XContentBuilder builder = jsonBuilder().startObject();
for (Parameter field : fields.getParameters()) {
String name = field.getNamedValue("name").stringValue();
// System.out.println("I am the E "+ name);
ElasticFieldMapper e = fieldMappers.get(name);
Object value = field.getValueExpression().value(item);
// System.out.println();
if (e.getType() != null) {
value = TypeHelper.cast(value, e.getType());
}
builder.field(name, value);
}
builder.endObject();
bulkProcessor.add(new IndexRequest(docIndex.stringValue(item).toLowerCase()).id(docId.stringValue(item)).source(builder));
}
catch (IOException ex) {
throw new InvalidRecordException("Unable to load record", ex);
}
}
@Override
public void performComplete() throws IOException, CompleteException
{
bulkProcessor.flush();
try {
bulkProcessor.awaitClose(waitCloseMinutes.intValue(), TimeUnit.MINUTES);
}
catch (InterruptedException ex) {
throw new CompleteException("Unable to close the bulk processor", ex);
}
finally {
client.close();
}
super.performComplete();
}
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.