Jdbc-river with Java Percolator API

How to use percolator with jdbc-river using java program?
I am not able to pass the document to percolator query using jdbc-river APi.

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.textQuery;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import org.apache.lucene.search.TermQuery;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.percolator.PercolatorExecutor;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.river.jdbc.Action;
import org.elasticsearch.river.jdbc.DefaultAction;
import org.elasticsearch.river.jdbc.Merger;
import org.elasticsearch.river.jdbc.SQLService;

public class Trial {
static Client client= null;
static long tRows=0L;
public static void main(String[] args) {
client=Trial.createElasticSearchInstance();
Trial.testStarQuery(client);
}
public static Client createElasticSearchInstance(){

	Settings s = ImmutableSettings.settingsBuilder()
     .put("discovery.zen.ping.multicast.enabled", false).put("gateway.expected_nodes", 1).put("index.number_of_replicas", 1)
     .put("cluster.name","ElasticSearch")
     .build();
	 Node node = NodeBuilder.nodeBuilder().settings(s).node();
	
	client = node.client();
	return client;
}

 
public static void testStarQuery(final Client client) {
	   	 
    try {
        String driverClassName = "com.mysql.jdbc.Driver";
        String url = "jdbc:mysql://localhost:3306/mysql";
        String username = "root";
        String password = "";
        String sql = "select * from mysampledata";
        List<Object> params = new ArrayList();
        int fetchsize = 0;
       
        //QueryBuilder qb = QueryBuilders.termQuery("Name", "Rana");
        //XContentBuilder xb = XContentFactory.jsonBuilder().startObject().field("Query", )
       
        client.prepareIndex("_percolator","jdbc","Q1")
        .setSource(jsonBuilder().startObject().field("query",termQuery("Name","Sanjay")).endObject()) 
        .setRefresh(true)
        .execute()
        .actionGet();

        Action listener = new DefaultAction() {
        	
        	
            @Override
            public void index(String index, String type, String id, long version, XContentBuilder builder) throws IOException {
            // System.err.println("index=" + index + " type=" + type + " id=" + id + " builder=" + builder.string());
            	//client.admin().indices().prepareCreate("_percolator").setSettings(settingsBuilder().put("index.number_of_shards", 5)).execute().actionGet();
               
  
                
             
      //          client.admin().indices().prepareCreate("xriver").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
             IndexResponse rsp = client.prepareIndex("_river","jdbc",id).setSource(builder).execute().actionGet();
                client.admin().indices().refresh(Requests.refreshRequest("_all")).actionGet();
                
            
               //  client.admin().indices().refresh(Requests.refreshRequest("_all")).actionGet();
                 
               
            }
            
        };
     
    
        
        SQLService service = new SQLService();
        Connection connection = service.getConnection(driverClassName, url, username, password, true);
        PreparedStatement statement = service.prepareStatement(connection, sql);
        service.bind(statement, params);
        ResultSet results = service.execute(statement, fetchsize);
            
        Merger merger = new Merger(listener, 1L);
        long rows = 0L;

        while (service.nextRow(results, merger)) {
            rows++;
            //System.out.println ("hello");
            }
       
        merger.close();
                         
        System.err.println("rows = " + rows);
        service.close(results);
        service.close(statement);
        service.close(connection);
        
   
        
       
    } catch (Exception e) {
        e.printStackTrace();
    }
 
}

}