ES 1.7.1 bulk inserts problems

Hi there, this seems really odd to me.

I'm using JDK 1.8 on a mac with 16GB of RAM

ES is running with ES_HEAP = 4GB

The client application has 3GB of HEAP

I'm loading a small CSV file : 470k records into ES. The problem is that bulk starts to degrade its speed overtime, and usually around 170k records I get an OOM error.

bigdesk shows that my cluster never uses more than 600M of heap (out of 4GB)

jvisualVM also shows that my client never uses more than 500mb

I'm not sure what could be causing this.

I'm using transport client BTW:

public boolean loadData(File input){
logger.info("Processing file {}", input.getName());
Long start = System.currentTimeMillis();
try {
CSVParser parser = CSVParser.parse(input, Charset.defaultCharset(), CSVFormat.EXCEL.withHeader());

        BulkRequestBuilder bulkRequest = client.prepareBulk();
        int total = 0;
        for(CSVRecord record : parser){
            
            XContentBuilder source = jsonBuilder().startObject();


            setValue(source,"uniqueCarrier", record.get("UniqueCarrier"));
            setValue(source,"carrier", record.get("Carrier"));
            setValue(source,"airlineId", record.get("AirlineID"));
            setValue(source,"flightDate", record.get("FlightDate"));
            setValue(source,"tailNum", record.get("TailNum"));
            setValue(source,"flightNum", record.get("FlightNum"));
            setValue(source,"origin", record.get("Origin"));
            setValue(source,"originCityName", record.get("OriginCityName"));
            setValue(source,"originState", record.get("OriginState"));
            setValue(source,"dest", record.get("Dest"));
            setValue(source,"destCityName", record.get("DestCityName"));
            setValue(source,"destState", record.get("DestState"));
            setValue(source,"crsDepTime", record.get("CRSDepTime"));
            setValue(source,"depTime", record.get("DepTime"));
            setValue(source,"depDelay", record.get("DepDelay"));
            setValue(source,"taxiOut", record.get("TaxiOut"));
            setValue(source,"wheelsOff", record.get("WheelsOff"));
            setValue(source,"wheelsOn", record.get("WheelsOn"));
            setValue(source,"taxiIn", record.get("TaxiIn"));
            setValue(source,"crsArrivalTime", record.get("CRSArrTime"));
            setValue(source,"arrivalTime", record.get("ArrTime"));
            setValue(source,"arrivalDelay", record.get("ArrDelay"));
            setValue(source,"cancelled", record.get("Cancelled"));
            setValue(source,"diverted", record.get("Diverted"));
            setValue(source,"crsElapsedTime", record.get("CRSElapsedTime"));
            setValue(source,"actualElapsedTime", record.get("ActualElapsedTime"));
            setValue(source,"airTime", record.get("AirTime"));
            setValue(source,"distance", record.get("Distance"));
            setValue(source,"carrierDelay", record.get("CarrierDelay"));
            setValue(source,"weatherDelay", record.get("WeatherDelay"));
            setValue(source,"nasDelay", record.get("NASDelay"));
            setValue(source,"securityDelay", record.get("SecurityDelay"));
            setValue(source,"aircraftDelay", record.get("LateAircraftDelay"));
            source.endObject();

            bulkRequest.add(client.prepareIndex("transportation", "departures", UUID.randomUUID().toString()).setSource(source));

            total++;

            if(total%2000==0) {
                System.out.println("Flushing " + total);
                bulkRequest.execute().actionGet();
            }

        }
        System.out.println("Flushing " + total);
        bulkRequest.execute().actionGet();

        parser.close();
    } catch (Exception e) {
        e.printStackTrace();
    }
    Long end = System.currentTimeMillis();
    logger.info("Finished indexing file {} in {} ms",input.getName(),(end-start));
    return true;
}

According to bigdesk the bulksize ranges from 3-7mb.

What's hard to find out is that according to jvisualvm I'm pretty far from an OOM

Any ideas?

You do not evaluate BulkResponse after sending bulk requests and wait for success. This will bog a cluster down.

See org.elasticsearch.action.bulk.BulkProcessor for the correct use.

Also, you do not re-create BulkRequest within the loop, which is a fatal mistake.