Some Bulk Indexing PRoblems


(Joshua P) #1

Hi there! Sorry I posted two topics. I've somehow managed to post an
incomplete post.

I'm trying to do a one-time index of about 800,000 records into an instance
of elasticsearch. But I'm having a bit of trouble. It continually fails
around 200,000 records. Looking at in the Elasticsearch Head Plugin, my
index goes offline and becomes unrecoverable.

For now, I have it running on a VM on my personal machine.

VM Config:
Ubuntu Server 14.04 64-Bit
8 GB RAM
2 Processors
32 GB SSD

Java
java version "1.7.0_65"
OpenJDK Runtime Environment (IcedTea 2.5.1) (7u65-2.5.1-4ubuntu1~0.14.04.
2)
OpenJDK 64-Bit Server VM (build 24.65-b04, mixed mode)

Elasticsearch is using mostly the defaults. This is the output of:
curl http://localhost:9200/_nodes/process?pretty
{
"cluster_name" : "property_transaction_data",
"nodes" : {
"KlFkO_qgSOKmV_jjj5xeVw" : {
"name" : "Marvin Flumm",
"transport_address" : "inet[/192.168.133.131:9300]",
"host" : "ubuntu-es",
"ip" : "127.0.1.1",
"version" : "1.3.2",
"build" : "dee175d",
"http_address" : "inet[/192.168.133.131:9200]",
"process" : {
"refresh_interval_in_millis" : 1000,
"id" : 1092,
"max_file_descriptors" : 65535,
"mlockall" : true
}
}
}
}

I adjusted ES_HEAP_SIZE to 512mb.

I'm using the following code to pull data from SQL Server and index it. Can
someone give me a hand and tell me what I might be doing wrong?

In the code below, PropertyGeneralInfoRow is just a class to represent a
row coming from the SQL view.

package com.rca.index;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rca.database.DBConnection;
import com.rca.database.PropertyGeneralInfoRow;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

import java.io.IOException;
import java.util.List;
import org.apache.logging.log4j.LogManager;

/**

  • Created by jpetersen on 8/1/14.
    */
    public class Indexer {

    private static final Logger logger = LogManager.getLogger(
    "ESBulkUploader");

    public static void main(String[] args) throws IOException,
    NoSuchFieldException {

     DBConnection dbConn = new DBConnection("");
    
     String query = "SELECT TOP 300000 * FROM vw_PropertyGeneralInfo 
    

WHERE Country_id = 1";

    System.out.println("getting data");
    List<PropertyGeneralInfoRow> pgiTable =  dbConn.

ExecuteQueryWithoutParameters(query);
System.out.println("got data");

    ObjectMapper mapper = new ObjectMapper();

    Settings settings = ImmutableSettings.settingsBuilder().put(

"cluster.name", "property_transaction_data").build();

    Client client = new TransportClient(settings).addTransportAddress(

new InetSocketTransportAddress("192.168.133.131", 9300));

    BulkProcessor bulkProcessor = BulkProcessor.builder(client, new 

BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
System.out.println("about to index " + request.
numberOfActions() + " records.");
}

        @Override
        public void afterBulk(long executionId, BulkRequest request, 

BulkResponse response) {
System.out.println("successfully indexed " + request.
numberOfActions() + " records in " + response.getTook() + ".");
}

        @Override
        public void afterBulk(long executionId, BulkRequest request, 

Throwable failure) {
System.out.println("failure somewhere on " + request.
toString());
failure.printStackTrace();
logger.warn("failure on " + request.toString());
}
}).setBulkActions(500).setConcurrentRequests(1).build();

    for( int i = 0; i < pgiTable.size(); i++ ){
        //prep location field
        PropertyGeneralInfoRow pgiRow = pgiTable.get(i);

        Double[] location = {pgiRow.getLon_dbl(), pgiRow.getLat_dbl()};

        geocode geocode = new geocode();

        geocode.setLocation(location);

        pgiRow.setGeocode(geocode);

        // prep full address string
        pgiRow.setFulladdressstring(pgiRow.getPropertykey_tx() + ", " +
                pgiRow.getCity_tx() + ", " + pgiRow.getStateprov_cd() +
                ", " + pgiRow.getCountry_tx() + ", " + pgiRow.

getPostalcode_tx());

        String jsonRow = mapper.writeValueAsString(pgiRow);

        if( jsonRow != null && !jsonRow.isEmpty() && !jsonRow.equals(

"{}") ){
bulkProcessor.add(new IndexRequest("rcapropertydata",
"rcaproperty").source(jsonRow.getBytes()));
// bulkProcessor.add(client.prepareIndex("rcapropertydata",
"rcaproperty").setSource(jsonRow));
}
else{
// don't add null strings..
try{
System.out.println(pgiRow.toString());
}
catch (Exception e){
System.out.println("Some error in toString() ...");
}
System.out.println("Some json output was null. -- " + pgiRow
.getProperty_id().toString());
}

    }

    bulkProcessor.flush();
    bulkProcessor.close();

}

}

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/864372f1-7a1b-4f9f-aac0-87688b60ccd3%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


(vineeth mohan-2) #2

Hello Joshua ,

Please refrain from posting the same question twice.
If you need to add additional information , just reply to the original
thread.

Thanks
Vineeth

On Tue, Sep 9, 2014 at 7:54 PM, Joshua P jpetersen841@gmail.com wrote:

Hi there! Sorry I posted two topics. I've somehow managed to post an
incomplete post.

I'm trying to do a one-time index of about 800,000 records into an
instance of elasticsearch. But I'm having a bit of trouble. It continually
fails around 200,000 records. Looking at in the Elasticsearch Head Plugin,
my index goes offline and becomes unrecoverable.

For now, I have it running on a VM on my personal machine.

VM Config:
Ubuntu Server 14.04 64-Bit
8 GB RAM
2 Processors
32 GB SSD

Java
java version "1.7.0_65"
OpenJDK Runtime Environment (IcedTea 2.5.1) (7u65-2.5.1-4ubuntu1~0.14.04.
2)
OpenJDK 64-Bit Server VM (build 24.65-b04, mixed mode)

Elasticsearch is using mostly the defaults. This is the output of:
curl http://localhost:9200/_nodes/process?pretty
{
"cluster_name" : "property_transaction_data",
"nodes" : {
"KlFkO_qgSOKmV_jjj5xeVw" : {
"name" : "Marvin Flumm",
"transport_address" : "inet[/192.168.133.131:9300]",
"host" : "ubuntu-es",
"ip" : "127.0.1.1",
"version" : "1.3.2",
"build" : "dee175d",
"http_address" : "inet[/192.168.133.131:9200]",
"process" : {
"refresh_interval_in_millis" : 1000,
"id" : 1092,
"max_file_descriptors" : 65535,
"mlockall" : true
}
}
}
}

I adjusted ES_HEAP_SIZE to 512mb.

I'm using the following code to pull data from SQL Server and index it.
Can someone give me a hand and tell me what I might be doing wrong?

In the code below, PropertyGeneralInfoRow is just a class to represent a
row coming from the SQL view.

package com.rca.index;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rca.database.DBConnection;
import com.rca.database.PropertyGeneralInfoRow;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

import java.io.IOException;
import java.util.List;
import org.apache.logging.log4j.LogManager;

/**

  • Created by jpetersen on 8/1/14.
    */
    public class Indexer {

    private static final Logger logger = LogManager.getLogger(
    "ESBulkUploader");

    public static void main(String[] args) throws IOException,
    NoSuchFieldException {

     DBConnection dbConn = new DBConnection("");
    
     String query = "SELECT TOP 300000 * FROM vw_PropertyGeneralInfo
    

WHERE Country_id = 1";

    System.out.println("getting data");
    List<PropertyGeneralInfoRow> pgiTable =  dbConn.

ExecuteQueryWithoutParameters(query);
System.out.println("got data");

    ObjectMapper mapper = new ObjectMapper();

    Settings settings = ImmutableSettings.settingsBuilder().put("

cluster.name", "property_transaction_data").build();

    Client client = new TransportClient(settings).addTransportAddress(

new InetSocketTransportAddress("192.168.133.131", 9300));

    BulkProcessor bulkProcessor = BulkProcessor.builder(client, new

BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request)
{
System.out.println("about to index " + request.
numberOfActions() + " records.");
}

        @Override
        public void afterBulk(long executionId, BulkRequest request,

BulkResponse response) {
System.out.println("successfully indexed " + request.
numberOfActions() + " records in " + response.getTook() + ".");
}

        @Override
        public void afterBulk(long executionId, BulkRequest request,

Throwable failure) {
System.out.println("failure somewhere on " + request.
toString());
failure.printStackTrace();
logger.warn("failure on " + request.toString());
}
}).setBulkActions(500).setConcurrentRequests(1).build();

    for( int i = 0; i < pgiTable.size(); i++ ){
        //prep location field
        PropertyGeneralInfoRow pgiRow = pgiTable.get(i);

        Double[] location = {pgiRow.getLon_dbl(), pgiRow.getLat_dbl

()};

        geocode geocode = new geocode();

        geocode.setLocation(location);

        pgiRow.setGeocode(geocode);

        // prep full address string
        pgiRow.setFulladdressstring(pgiRow.getPropertykey_tx() + ", "
  •               pgiRow.getCity_tx() + ", " + pgiRow.getStateprov_cd()
    
  •               ", " + pgiRow.getCountry_tx() + ", " + pgiRow.
    

getPostalcode_tx());

        String jsonRow = mapper.writeValueAsString(pgiRow);

        if( jsonRow != null && !jsonRow.isEmpty() && !jsonRow.equals(

"{}") ){
bulkProcessor.add(new IndexRequest("rcapropertydata",
"rcaproperty").source(jsonRow.getBytes()));
//
bulkProcessor.add(client.prepareIndex("rcapropertydata",
"rcaproperty").setSource(jsonRow));
}
else{
// don't add null strings..
try{
System.out.println(pgiRow.toString());
}
catch (Exception e){
System.out.println("Some error in toString() ...");
}
System.out.println("Some json output was null. -- " +
pgiRow.getProperty_id().toString());
}

    }

    bulkProcessor.flush();
    bulkProcessor.close();

}

}

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/864372f1-7a1b-4f9f-aac0-87688b60ccd3%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/864372f1-7a1b-4f9f-aac0-87688b60ccd3%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAGdPd5mVRwOB_tyZjgH%2B%3DmG22OVfx%2BqeJHP%2BkG291qOL7%3Dsxhw%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.


(system) #3