Hi David
Below is the code please review and tell me where i m going wrong
package org;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.Base64;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
/*import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.format.ISODateTimeFormat;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;*/
public class Index implements Serializable {
private static final long serialVersionUID = 1L;
private static String index = null;
private static String type = null;
private String analyzer = null;
private BulkRequestBuilder bulk;
private Client client ;
public void setClient(Client client){
this.client=client;
}
private final long bulkSize = 0L;
public Index() {
this(SMDSearchProperties.INDEX_NAME, SMDSearchProperties.INDEX_TYPE_DOC, null);
}
public Index(String index, String type, String analyzer) {
super();
this.index = index;
this.type = type;
this.analyzer = analyzer;
}
/**
* @return the index
*/
public String getIndex() {
return index;
}
/**
* @param index the index to set
*/
public void setIndex(String index) {
this.index = index;
}
/**
* @return the type
*/
public String getType() {
return type;
}
/**
* @param type the type to set
*/
public void setType(String type) {
this.type = type;
}
/**
* @return the analyzer
*/
public String getAnalyzer() {
return analyzer;
}
/**
* @param analyzer the analyzer to set
*/
public void setAnalyzer(String analyzer) {
this.analyzer = analyzer;
}
@Override
public boolean equals(Object obj) {
if (obj == null) return false;
if (!(obj instanceof Index)) return false;
Index index = (Index) obj;
if (this.index != index.index && this.index != null && !this.index.equals(index.index)) return false;
if (this.type != index.type && this.type != null && !this.type.equals(index.type)) return false;
if (this.analyzer != index.analyzer && this.analyzer != null && !this.analyzer.equals(index.analyzer)) return false;
return true;
}
private boolean isMappingExist(String index, String type) {
ClusterState cs = client.admin().cluster().prepareState().setFilterIndices(index).execute().actionGet().getState();
IndexMetaData imd = cs.getMetaData().index(index);
if (imd == null) return false;
MappingMetaData mdd = imd.mapping(type);
if (mdd != null) return true;
return false;
}
public void pushMapping(){
boolean mappingExist = isMappingExist(index, type);
if (!mappingExist) {
System.out.println("Index Creating First Time...");
CreateIndexResponse createIndexResponse = new CreateIndexRequestBuilder( client.admin().indices(), index ).execute().actionGet();
XContentBuilder xbMapping = null;
try {
xbMapping = jsonBuilder().startObject()
.startObject(type).startObject("properties")
.startObject("file").field("type", "attachment")
.endObject()
.endObject().endObject().endObject();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
PutMappingResponse response = client.admin().indices()
.preparePutMapping(index)
.setIndices(index)
.setType(type)
.setSource(xbMapping)
.execute().actionGet();
}
else
System.out.println("Mapping Exist!!!");
}
public void indexFile(File file) throws Exception {
FileInputStream fileReader = new FileInputStream(file);
bulk = client.prepareBulk();
byte[] buffer = new byte[1024];
ByteArrayOutputStream bos = new ByteArrayOutputStream();
int i = 0;
while (-1 != (i = fileReader.read(buffer))) {
bos.write(buffer, 0, i);
}
byte[] data = bos.toByteArray();
fileReader.close();
bos.close();
esIndex(index,
type,
SignTool.sign(file.getAbsolutePath()),
jsonBuilder()
.startObject()
.field(FsRiverUtil.DOC_FIELD_NAME, file.getName())
.field(FsRiverUtil.DOC_FIELD_DATE,
file.lastModified())
.field(FsRiverUtil.DOC_FIELD_PATH_ENCODED,
SignTool.sign(file.getParent()))
.startObject("file").field("_name", file.getName())
.field("content", new String(Base64.encodeBytes(data)))
.endObject().endObject());
}
private void esIndex(String index, String type, String id,
XContentBuilder xb) throws Exception {
System.out.println("\nin :: "+index+"\nty :: "+type+"\nid :: "+id+"\nxb :: "+xb);
System.out.println(client);
System.out.println(xb.string());
System.out.println("BULK :: "+bulk);
bulk.add(client.prepareIndex(index, type, id).setSource(xb));
commitBulkIfNeeded();
}
private void commitBulkIfNeeded() {
System.out.println(bulk.numberOfActions());
try {
if (bulk != null && bulk.numberOfActions() > 0 && bulk.numberOfActions() >= bulkSize) {
//if (logger.isDebugEnabled()) logger.debug("ES Bulk Commit is needed");
System.out.println("bulk start execute");
BulkResponse response = bulk.execute().actionGet();
System.out.println("bulk executed");
if (response.hasFailures()) {
//logger.warn("Failed to execute "
//+ response.buildFailureMessage());
System.out.println("loger failed 'comitted'");
}
System.out.println("Succeded");
// Reinit a new bulk
bulk = client.prepareBulk();
}
} catch (ElasticSearchException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void destroy(Client client) throws Exception {
client.close();
}
}