Issue while using Elastic Search (ESTap) for processing data

Hi, I am new to the Elastic Search and I am stuck with an issue. I am developing an application with cascading api.
I am processing 10 million rows of data with 43 columns. Now my issue is when i am dumping data to sink tap ( using default hfs sink tap) it takes me 1 -2 minutes to completely dump the data but when i use ESTap instead of hfs tap it takes 1 hour. While configuring the elastic search we configured 3 nodes with all nodes acting as master as well as data nodes and "bootstrap.memory_lock: true" , rest is kept as default settings.
Do I need to change the configuration so that the process takes less time? Please help. Thanks in Advance.

String inputPath = args[0]+File.separator+"10_million_rows.csv";
Tap inputTap = new Hfs( new TextDelimited( new Fields( "empid","gender","title","nameset","surname","city","statefull","zipcode","header1","header2","header3","header4","header5","header6","header7","header8","header9","header10","header11","header12","header13","header14","header15","header16","header17","header18","header19","header20","header21","header22","header23","header24","header25","header26","header27","header28","header29","header30","header31","header32","expr1","expr2","expr3","expr4" ) ,true , "," ), inputPath );
Pipe pipe = new Pipe("pipe");

/*
Tap sinkTap = new Hfs( new TextDelimited( new Fields( "empid","gender","title","nameset","surname","city","statefull","zipcode","header1","header2","header3","header4","header5","header6","header7","header8","header9","header10","header11","header12","header13","header14","header15","header16","header17","header18","header19","header20","header21","header22","header23","header24","header25","header26","header27","header28","header29","header30","header31","header32","expr1","expr2","expr3","expr4" ) ,true , "," ), "/hdfsdata/output" );
*/

Tap sinkTap = new EsTap("master-host",9200,"index1/type1", new Fields( "empid","gender","title","nameset","surname","city","statefull","zipcode","header1","header2","header3","header4","header5","header6","header7","header8","header9","header10","header11","header12","header13","header14","header15","header16","header17","header18","header19","header20","header21","header22","header23","header24","header25","header26","header27","header28","header29","header30","header31","header32","expr1","expr2","expr3","expr4" ));

FlowDef flowDef = FlowDef.flowDef()
.addSource(pipe,inputTap)
.addTailSink(pipe,sinkTap);

Properties properties = new Properties();
Flow flow = new Hadoop2MR1FlowConnector(properties).connect(flowDef);
flow.complete();

---------------------------------- Node1 -----------------------------------

cluster.name: electrik-io
node.name: master
node.master: true
node.data: true
path.data: "/secondary/elasticsearch/data"
path.logs: "/secondary/elasticsearch/logs"
bootstrap.memory_lock: true
bootstrap.system_call_filter: false
network.host: ["master", "localhost"]
http.port: 9200
transport.tcp.port: 9300
http.enabled: true
discovery.zen.ping.unicast.hosts: ["master", "slave1", "slave2"]
discovery.zen.minimum_master_nodes: 3

---------------------------------- Node2 -----------------------------------

cluster.name: electrik-io
node.name: slave1
node.master: true
node.data: true
path.data: "/secondary/elasticsearch/data"
path.logs: "/secondary/elasticsearch/logs"
bootstrap.memory_lock: true
bootstrap.system_call_filter: false
network.host: ["slave1", "localhost"]
http.port: 9200
transport.tcp.port: 9300
http.enabled: true
discovery.zen.ping.unicast.hosts: ["master", "slave1", "slave2"]
discovery.zen.minimum_master_nodes: 3

---------------------------------- Node3 -----------------------------------

cluster.name: electrik-io
node.name: slave2
node.master: true
node.data: true
path.data: "/secondary/elasticsearch/data"
path.logs: "/secondary/elasticsearch/logs"
bootstrap.memory_lock: true
bootstrap.system_call_filter: false
network.host: ["slave2", "localhost"]
http.port: 9200
transport.tcp.port: 9300
http.enabled: true
discovery.zen.ping.unicast.hosts: ["master", "slave1", "slave2"]
discovery.zen.minimum_master_nodes: 3

I moved the question to #elasticsearch-and-hadoop

@Kunal_Ghosh One option for optimizing your job is to tune the batch output sizes for Elasticsearch on the EsTap configuration. Feel free to peruse our documentation on performance for more information.

@james.baiera Thanks for the prompt response !
I am using Elastic Search Version 5.5.0, the default figure for number of shards is 5 so in my case i have 3 nodes = 15 shards. Will this have adverse effect on performance??
Also how do I configure number of shards per node in Elastic Search 5.5.0 ? In earlier versions it was in elasticsearch.yml file but now I could not find where to configure it.

I'm not sure I understand how your math pans out: Shard counts are per index and are distributed across the cluster. Shard counts are not based on the number of nodes you have, unless you mean that you have an index with 15 shards and it happens to have 5 shards on each of your 3 nodes?

Shards are configurable at index creation time via the index's settings. If no settings are provided, the default number of shards and replicas for the cluster are used.

In terms of execution time for your job, have you modified the batch sizes for the job at all? You can tune the maximum batch request sizes by using es.batch.size.bytes (default 1mb), es.batch.size.entries (default 1000), and es.batch.write.refresh (default true). These are often good starting numbers but tend to be fairly limiting for larger datasets.

Another thing that might be worth tuning is the job's parallelism. You want to make sure you have a reasonable rate of indexing with enough writing clients to push your data, but not so many clients that you start seeing rejections from Elasticsearch. Remember a good rule of thumb is that each node can handle about 50 outstanding bulk requests in its queue at a time, so you want to be below this mark for concurrent writers.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.