How to use percolator with jdbc-river using java program?
I am not able to pass the document to percolator query using jdbc-river APi.
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.textQuery;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import org.apache.lucene.search.TermQuery;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.percolator.PercolatorExecutor;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.river.jdbc.Action;
import org.elasticsearch.river.jdbc.DefaultAction;
import org.elasticsearch.river.jdbc.Merger;
import org.elasticsearch.river.jdbc.SQLService;
public class Trial {
static Client client= null;
static long tRows=0L;
public static void main(String[] args) {
client=Trial.createElasticSearchInstance();
Trial.testStarQuery(client);
}
public static Client createElasticSearchInstance(){
Settings s = ImmutableSettings.settingsBuilder()
.put("discovery.zen.ping.multicast.enabled", false).put("gateway.expected_nodes", 1).put("index.number_of_replicas", 1)
.put("cluster.name","ElasticSearch")
.build();
Node node = NodeBuilder.nodeBuilder().settings(s).node();
client = node.client();
return client;
}
public static void testStarQuery(final Client client) {
try {
String driverClassName = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://localhost:3306/mysql";
String username = "root";
String password = "";
String sql = "select * from mysampledata";
List<Object> params = new ArrayList();
int fetchsize = 0;
//QueryBuilder qb = QueryBuilders.termQuery("Name", "Rana");
//XContentBuilder xb = XContentFactory.jsonBuilder().startObject().field("Query", )
client.prepareIndex("_percolator","jdbc","Q1")
.setSource(jsonBuilder().startObject().field("query",termQuery("Name","Sanjay")).endObject())
.setRefresh(true)
.execute()
.actionGet();
Action listener = new DefaultAction() {
@Override
public void index(String index, String type, String id, long version, XContentBuilder builder) throws IOException {
// System.err.println("index=" + index + " type=" + type + " id=" + id + " builder=" + builder.string());
//client.admin().indices().prepareCreate("_percolator").setSettings(settingsBuilder().put("index.number_of_shards", 5)).execute().actionGet();
// client.admin().indices().prepareCreate("xriver").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
IndexResponse rsp = client.prepareIndex("_river","jdbc",id).setSource(builder).execute().actionGet();
client.admin().indices().refresh(Requests.refreshRequest("_all")).actionGet();
// client.admin().indices().refresh(Requests.refreshRequest("_all")).actionGet();
}
};
SQLService service = new SQLService();
Connection connection = service.getConnection(driverClassName, url, username, password, true);
PreparedStatement statement = service.prepareStatement(connection, sql);
service.bind(statement, params);
ResultSet results = service.execute(statement, fetchsize);
Merger merger = new Merger(listener, 1L);
long rows = 0L;
while (service.nextRow(results, merger)) {
rows++;
//System.out.println ("hello");
}
merger.close();
System.err.println("rows = " + rows);
service.close(results);
service.close(statement);
service.close(connection);
} catch (Exception e) {
e.printStackTrace();
}
}
}