Hi there! Sorry I posted two topics. I've somehow managed to post an
incomplete post.
I'm trying to do a one-time index of about 800,000 records into an instance
of elasticsearch. But I'm having a bit of trouble. It continually fails
around 200,000 records. Looking at in the Elasticsearch Head Plugin, my
index goes offline and becomes unrecoverable.
For now, I have it running on a VM on my personal machine.
VM Config:
Ubuntu Server 14.04 64-Bit
8 GB RAM
2 Processors
32 GB SSD
Java
java version "1.7.0_65"
OpenJDK Runtime Environment (IcedTea 2.5.1) (7u65-2.5.1-4ubuntu1~0.14.04.
2)
OpenJDK 64-Bit Server VM (build 24.65-b04, mixed mode)
Elasticsearch is using mostly the defaults. This is the output of:
curl http://localhost:9200/_nodes/process?pretty
{
"cluster_name" : "property_transaction_data",
"nodes" : {
"KlFkO_qgSOKmV_jjj5xeVw" : {
"name" : "Marvin Flumm",
"transport_address" : "inet[/192.168.133.131:9300]",
"host" : "ubuntu-es",
"ip" : "127.0.1.1",
"version" : "1.3.2",
"build" : "dee175d",
"http_address" : "inet[/192.168.133.131:9200]",
"process" : {
"refresh_interval_in_millis" : 1000,
"id" : 1092,
"max_file_descriptors" : 65535,
"mlockall" : true
}
}
}
}
I adjusted ES_HEAP_SIZE to 512mb.
I'm using the following code to pull data from SQL Server and index it. Can
someone give me a hand and tell me what I might be doing wrong?
In the code below, PropertyGeneralInfoRow is just a class to represent a
row coming from the SQL view.
package com.rca.index;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rca.database.DBConnection;
import com.rca.database.PropertyGeneralInfoRow;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import java.io.IOException;
import java.util.List;
import org.apache.logging.log4j.LogManager;
/**
-
Created by jpetersen on 8/1/14.
*/
public class Indexer {private static final Logger logger = LogManager.getLogger(
"ESBulkUploader");public static void main(String[] args) throws IOException,
NoSuchFieldException {DBConnection dbConn = new DBConnection(""); String query = "SELECT TOP 300000 * FROM vw_PropertyGeneralInfo
WHERE Country_id = 1";
System.out.println("getting data");
List<PropertyGeneralInfoRow> pgiTable = dbConn.
ExecuteQueryWithoutParameters(query);
System.out.println("got data");
ObjectMapper mapper = new ObjectMapper();
Settings settings = ImmutableSettings.settingsBuilder().put(
"cluster.name", "property_transaction_data").build();
Client client = new TransportClient(settings).addTransportAddress(
new InetSocketTransportAddress("192.168.133.131", 9300));
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new
BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
System.out.println("about to index " + request.
numberOfActions() + " records.");
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
System.out.println("successfully indexed " + request.
numberOfActions() + " records in " + response.getTook() + ".");
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
System.out.println("failure somewhere on " + request.
toString());
failure.printStackTrace();
logger.warn("failure on " + request.toString());
}
}).setBulkActions(500).setConcurrentRequests(1).build();
for( int i = 0; i < pgiTable.size(); i++ ){
//prep location field
PropertyGeneralInfoRow pgiRow = pgiTable.get(i);
Double[] location = {pgiRow.getLon_dbl(), pgiRow.getLat_dbl()};
geocode geocode = new geocode();
geocode.setLocation(location);
pgiRow.setGeocode(geocode);
// prep full address string
pgiRow.setFulladdressstring(pgiRow.getPropertykey_tx() + ", " +
pgiRow.getCity_tx() + ", " + pgiRow.getStateprov_cd() +
", " + pgiRow.getCountry_tx() + ", " + pgiRow.
getPostalcode_tx());
String jsonRow = mapper.writeValueAsString(pgiRow);
if( jsonRow != null && !jsonRow.isEmpty() && !jsonRow.equals(
"{}") ){
bulkProcessor.add(new IndexRequest("rcapropertydata",
"rcaproperty").source(jsonRow.getBytes()));
// bulkProcessor.add(client.prepareIndex("rcapropertydata",
"rcaproperty").setSource(jsonRow));
}
else{
// don't add null strings..
try{
System.out.println(pgiRow.toString());
}
catch (Exception e){
System.out.println("Some error in toString() ...");
}
System.out.println("Some json output was null. -- " + pgiRow
.getProperty_id().toString());
}
}
bulkProcessor.flush();
bulkProcessor.close();
}
}
--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/864372f1-7a1b-4f9f-aac0-87688b60ccd3%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.