RestClient always gives “I/O reactor status: STOPPED”

I followed this thread but not able to resolve this issue I have a rest high level client and Elasticsearch 7.2 I am trying to bulk insert and keep getting this error -"java.lang.IllegalStateException: Request cannot be executed; I/O reactor status: STOPPED". Could anyone please help in getting me out of this?

Here I am pasting the code snippet of my actual code ...

public class ElasticSearchLoader
extends BaseLoader
{

private RestHighLevelClient client;
private BulkProcessor bulkProcessor;

private HashMap<String, ElasticFieldMapper> fieldMappers = new HashMap<String, ElasticFieldMapper>();

@Override
protected void performBegin() throws InitializeException
{
super.performBegin();

client = ElasticSearchHelper
    .makeConnection(clusterName.stringValue(), server.stringValue(), port.intValue());

BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
   
    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
    	for (BulkItemResponse resp : response.getItems()) {
          if (resp.isFailed()) {
            System.out.println(resp.getFailureMessage());
            getResults().reportProblem(
                "Elastic Search Load Error",
                Severity.Error,
                resp.getFailureMessage(),
                null);

          }
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {
    	  context.log().warn("Error loading bulk elastic search requests", failure);
    }


@Override
public void beforeBulk(long executionId, BulkRequest request) {
  context.log().info("Creating bulk processor.");
  context.log().info("Number of actions: " + request.numberOfActions());
}
};


 bulkProcessor = BulkProcessor.builder(
        (request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener).build();

BulkProcessor.Builder builder = BulkProcessor.builder(
        (request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener);


builder.setBulkActions(bulkActions.intValue()); 
builder.setBulkSize(new ByteSizeValue(bulkSize.intValue(), ByteSizeUnit.MB));
builder.setFlushInterval(TimeValue.timeValueSeconds(flushSeconds.intValue()));
builder.setConcurrentRequests(concurrentRequests.intValue()).build();
builder.setBackoffPolicy(
        BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));

setFieldMappers();

// if (!indexExists()) {
// setESFieldMapping();
// }
context.log().info(
"Loading elastic search; server: "
+ server.stringValue()
+ ", cluster: "
+ clusterName.stringValue()
+ ", index: "
+ docIndex.stringValue()
+ ", type: "
+ docType.stringValue());

}

@Override
public void performLoad(Record item) throws InvalidRecordException, ETLSystemException
{
try {
XContentBuilder builder = jsonBuilder().startObject();
for (Parameter field : fields.getParameters()) {
String name = field.getNamedValue("name").stringValue();

   // System.out.println("I am the E "+ name);
    ElasticFieldMapper e = fieldMappers.get(name);

   
    
    
    Object value = field.getValueExpression().value(item);
  //  System.out.println();
    if (e.getType() != null) {
      value = TypeHelper.cast(value, e.getType());
  }
    builder.field(name, value);
  }
  builder.endObject();


  bulkProcessor.add(new IndexRequest(docIndex.stringValue(item).toLowerCase()).id(docId.stringValue(item)).source(builder));
  
}
catch (IOException ex) {
  throw new InvalidRecordException("Unable to load record", ex);
}

}

@Override
public void performComplete() throws IOException, CompleteException
{
bulkProcessor.flush();

try {
    bulkProcessor.awaitClose(waitCloseMinutes.intValue(), TimeUnit.MINUTES);
  }
  catch (InterruptedException ex) {
    throw new CompleteException("Unable to close the bulk processor", ex);
  }
  finally {
    client.close();
  }

super.performComplete();

}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.