Cluster sizing + speed

Hello,

I installed ES on 6 servers (4cores, 4G phy ram). ES is getting 2G
java heap (-xmx2g).

Im using pyes client, with connection pooling inserting continuous
stream of data (~ 50KB per insert). Client use 6 threads for
insertion. Im using Thrift. I have index with replication 2 and 20
shards. What happened is after few minutes (4min) my client starting
reporting:

I tried the bulk insertion + normal insert (the same results). I
tried then send 30x 50Kb of data for client with 6 threads and
sleep(1sec). Doesnt help that much.

I heard that ppl with Solandra kind of same HW config go for 4Mbit
stream of insert.

What do you think about my results? Is the bottleneck jvm heap? If yes
so is it normal i can index about only 5 doc /per sec? How can i find
out the right size of cluster / stream size?

I cant see any related info on the webpage so thanks for all your
advices.

Errors are below + piece of code im using for insertion.
(multithreading in client is made by celery with moneky_patch())

Thanks

Client error: timed out
Traceback (most recent call last):
File "/usr/lib/python2.6/site-packages/pyes-0.15.0-py2.6.egg/pyes/
connection.py", line 166, in _client_call
return getattr(conn.client, attr)(*args, **kwargs)
File "/usr/lib/python2.6/site-packages/pyes-0.15.0-py2.6.egg/pyes/
pyesthrift/Rest.py", line 40, in execute
return self.recv_execute()
File "/usr/lib/python2.6/site-packages/pyes-0.15.0-py2.6.egg/pyes/
pyesthrift/Rest.py", line 51, in recv_execute
(fname, mtype, rseqid) = self._iprot.readMessageBegin()
File "/usr/lib/python2.6/site-packages/thrift/protocol/
TBinaryProtocol.py", line 126, in readMessageBegin
sz = self.readI32()
File "/usr/lib/python2.6/site-packages/thrift/protocol/
TBinaryProtocol.py", line 203, in readI32
buff = self.trans.readAll(4)
File "/usr/lib/python2.6/site-packages/thrift/transport/
TTransport.py", line 58, in readAll
chunk = self.read(sz-have)
File "/usr/lib/python2.6/site-packages/thrift/transport/
TTransport.py", line 160, in read
self.__rbuf = StringIO(self.__trans.read(max(sz,
self.__rbuf_size)))
File "/usr/lib/python2.6/site-packages/thrift/transport/TSocket.py",
line 94, in read
buff = self.handle.recv(sz)
File "/usr/lib/python2.6/site-packages/eventlet-0.9.15-py2.6.egg/
eventlet/greenio.py", line 238, in recv
timeout_exc=socket.timeout("timed out"))
File "/usr/lib/python2.6/site-packages/eventlet-0.9.15-py2.6.egg/
eventlet/hubs/init.py", line 121, in trampoline
return hub.switch()
File "/usr/lib/python2.6/site-packages/eventlet-0.9.15-py2.6.egg/
eventlet/hubs/hub.py", line 177, in switch
return self.greenlet.switch()
timeout: timed out

On the servers there is error message in logs which occurs in log
about every second.

[2011-04-18 23:27:17,265][DEBUG][action.admin.indices.flush] [Skunge
the Laxidazian Troll] [archive][4], node[V2LD8E_GRr-KC9Ve2IoZxw], [R],
s[STARTED]: Failed to execute
[org.elasticsearch.action.admin.indices.flush.FlushRequest@7fab2a74]
org.elasticsearch.transport.RemoteTransportException: [Obituary][inet[/
IP:9300]][indices/flush/shard]
Caused by:
org.elasticsearch.index.engine.FlushNotAllowedEngineException:
[archive][4] Already flushing...
at
org.elasticsearch.index.engine.robin.RobinEngine.flush(RobinEngine.java:
586)
at
org.elasticsearch.index.shard.service.InternalIndexShard.flush(InternalIndexShard.java:
402)
at
org.elasticsearch.action.admin.indices.flush.TransportFlushAction.shardOperation(TransportFlushAction.java:
114)
at
org.elasticsearch.action.admin.indices.flush.TransportFlushAction.shardOperation(TransportFlushAction.java:
50)
at
org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction
$ShardTransportHandler.messageReceived(TransportBroadcastOperationAction.java:
380)
at
org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction
$ShardTransportHandler.messageReceived(TransportBroadcastOperationAction.java:
369)
at org.elasticsearch.transport.netty.MessageChannelHandler
$RequestHandler.run(MessageChannelHandler.java:238)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)


code

so Im inserting data with

indexEmailData(self, data, _id):


def get_conn():
#change to thrift
iconn = ES(['host1:9500', (6total hosts)],
timeout=4, max_retries=30, bulk_size=50)

return iconn

class ESload():

def __init__(self):
    self._indexName = 'archive'
    self.iconn = get_conn()
    self.bulkSize = 0

def createIndex(self):

    #_indexType = 'email'
    status = None
    try:
        status = self.iconn.status(self._indexName)
    except:
        #put the shards/replicas into creation... ?
        #iconn.create_index(_indexName)

        mappingsEmail = {

                        u'inbox': {'type': u'string'},
                        u'from':{'type': u'string'},
                        u'subject':{'type': u'string'},
                       u'date':{'type': u'date'},
                        u'messageID':{'type': u'string', 'index':

'not_analyzed'},
u'attachments':{'type': u'string'},
u'size':{'type': u'long', 'index':
'not_analyzed'},
u'body': {'type': u'string'}

        }

        mappingsEnv = {

                 u'sender': {'type': u'string'},
                 u'recipient': {'type': u'string'},
                 u'ip': {'type': 'ip'}

        }

        mappingsSource = {
                        '_source': {'compress': 'true'}
        }

        status = self.iconn.put_mapping("email",

{'properties':mappingsEmail}, self._indexName)
self.iconn.put_mapping("email", mappingsSource,
self._indexName)

        self.iconn.put_mapping("envelope",

{'properties':mappingsEnv}, self._indexName)
self.iconn.put_mapping("envelope", mappingsSource,
self._indexName)

def indexEmailData(self, data, _id):

    self.iconn.index(data, self._indexName, "email", _id)
    #self.iconn.refresh([self._indexName])
    self.bulkSize += 1

    if self.bulkSize == 350:
        #self.iconn.refresh([self._indexName])
        self.bulkSize = 0
        print self.bulkSize

Hello,

so i connected into each node in cluster over jmx... Im inserting with
one client (process), while im looking at jmx over jvirtual... during
data insertion into cluster (not bulk, avg size of one insert ~40K)
the CPU usage by jvm on all nodes is about ~3%. After a while during
insertion suddenly CPU usage on three nodes of the cluster reach ~70%

  • now the error1, attached below, shows up on client side. It tooks
    about 2-3min and then everything works fine.. This behavior is
    repeating in the same scenario in "periodical" intervals.

Does it mean that cluster is badly balanced? I have 6 nodes ,
replication = 2 , number of shards = 20. What is usual scenario how to
tune it?

Thanks for any ideas.

(error1)
Traceback (most recent call last):
File "/usr/lib/python2.6/site-packages/pyes-0.15.0-py2.6.egg/pyes/
connection.py", line 166, in _client_call
return getattr(conn.client, attr)(*args, **kwargs)
File "/usr/lib/python2.6/site-packages/pyes-0.15.0-py2.6.egg/pyes/
pyesthrift/Rest.py", line 40, in execute
return self.recv_execute()
File "/usr/lib/python2.6/site-packages/pyes-0.15.0-py2.6.egg/pyes/
pyesthrift/Rest.py", line 51, in recv_execute
(fname, mtype, rseqid) = self._iprot.readMessageBegin()
File "/usr/lib/python2.6/site-packages/thrift/protocol/
TBinaryProtocol.py", line 126, in readMessageBegin
sz = self.readI32()
File "/usr/lib/python2.6/site-packages/thrift/protocol/
TBinaryProtocol.py", line 203, in readI32
buff = self.trans.readAll(4)
File "/usr/lib/python2.6/site-packages/thrift/transport/
TTransport.py", line 58, in readAll
chunk = self.read(sz-have)
File "/usr/lib/python2.6/site-packages/thrift/transport/
TTransport.py", line 160, in read
self.__rbuf = StringIO(self.__trans.read(max(sz,
self.__rbuf_size)))
File "/usr/lib/python2.6/site-packages/thrift/transport/TSocket.py",
line 94, in read
buff = self.handle.recv(sz)
timeout: timed out

Only one node from cluster (6nodes) has in its log info about
compactions....
[2011-04-19 03:29:25,634][INFO ][monitor.jvm ] [Ox] [gc]
[ConcurrentMarkSweep][80] took [5.5s]/[2.3s], reclaimed [-6.3E7b],
leaving [1.6gb] used, max [2.1gb]

On Apr 18, 11:56 pm, peterob peterob...@gmail.com wrote:

Hello,

I installed ES on 6 servers (4cores, 4G phy ram). ES is getting 2G
java heap (-xmx2g).

Im using pyes client, with connection pooling inserting continuous
stream of data (~ 50KB per insert). Client use 6 threads for
insertion. Im using Thrift. I have index with replication 2 and 20
shards. What happened is after few minutes (4min) my client starting
reporting:

I tried the bulk insertion + normal insert (the same results). I
tried then send 30x 50Kb of data for client with 6 threads and
sleep(1sec). Doesnt help that much.

I heard that ppl with Solandra kind of same HW config go for 4Mbit
stream of insert.

What do you think about my results? Is the bottleneck jvm heap? If yes
so is it normal i can index about only 5 doc /per sec? How can i find
out the right size of cluster / stream size?

I cant see any related info on the webpage so thanks for all your
advices.

Errors are below + piece of code im using for insertion.
(multithreading in client is made by celery with moneky_patch())

Thanks

Client error: timed out
Traceback (most recent call last):
File "/usr/lib/python2.6/site-packages/pyes-0.15.0-py2.6.egg/pyes/
connection.py", line 166, in _client_call
return getattr(conn.client, attr)(*args, **kwargs)
File "/usr/lib/python2.6/site-packages/pyes-0.15.0-py2.6.egg/pyes/
pyesthrift/Rest.py", line 40, in execute
return self.recv_execute()
File "/usr/lib/python2.6/site-packages/pyes-0.15.0-py2.6.egg/pyes/
pyesthrift/Rest.py", line 51, in recv_execute
(fname, mtype, rseqid) = self._iprot.readMessageBegin()
File "/usr/lib/python2.6/site-packages/thrift/protocol/
TBinaryProtocol.py", line 126, in readMessageBegin
sz = self.readI32()
File "/usr/lib/python2.6/site-packages/thrift/protocol/
TBinaryProtocol.py", line 203, in readI32
buff = self.trans.readAll(4)
File "/usr/lib/python2.6/site-packages/thrift/transport/
TTransport.py", line 58, in readAll
chunk = self.read(sz-have)
File "/usr/lib/python2.6/site-packages/thrift/transport/
TTransport.py", line 160, in read
self.__rbuf = StringIO(self.__trans.read(max(sz,
self.__rbuf_size)))
File "/usr/lib/python2.6/site-packages/thrift/transport/TSocket.py",
line 94, in read
buff = self.handle.recv(sz)
File "/usr/lib/python2.6/site-packages/eventlet-0.9.15-py2.6.egg/
eventlet/greenio.py", line 238, in recv
timeout_exc=socket.timeout("timed out"))
File "/usr/lib/python2.6/site-packages/eventlet-0.9.15-py2.6.egg/
eventlet/hubs/init.py", line 121, in trampoline
return hub.switch()
File "/usr/lib/python2.6/site-packages/eventlet-0.9.15-py2.6.egg/
eventlet/hubs/hub.py", line 177, in switch
return self.greenlet.switch()
timeout: timed out

On the servers there is error message in logs which occurs in log
about every second.

[2011-04-18 23:27:17,265][DEBUG][action.admin.indices.flush] [Skunge
the Laxidazian Troll] [archive][4], node[V2LD8E_GRr-KC9Ve2IoZxw], [R],
s[STARTED]: Failed to execute
[org.elasticsearch.action.admin.indices.flush.FlushRequest@7fab2a74]
org.elasticsearch.transport.RemoteTransportException: [Obituary][inet[/
IP:9300]][indices/flush/shard]
Caused by:
org.elasticsearch.index.engine.FlushNotAllowedEngineException:
[archive][4] Already flushing...
at
org.elasticsearch.index.engine.robin.RobinEngine.flush(RobinEngine.java:
586)
at
org.elasticsearch.index.shard.service.InternalIndexShard.flush(InternalInde xShard.java:
402)
at
org.elasticsearch.action.admin.indices.flush.TransportFlushAction.shardOper ation(TransportFlushAction.java:
114)
at
org.elasticsearch.action.admin.indices.flush.TransportFlushAction.shardOper ation(TransportFlushAction.java:
50)
at
org.elasticsearch.action.support.broadcast.TransportBroadcastOperationActio n
$ShardTransportHandler.messageReceived(TransportBroadcastOperationAction.ja va:
380)
at
org.elasticsearch.action.support.broadcast.TransportBroadcastOperationActio n
$ShardTransportHandler.messageReceived(TransportBroadcastOperationAction.ja va:
369)
at org.elasticsearch.transport.netty.MessageChannelHandler
$RequestHandler.run(MessageChannelHandler.java:238)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)


code

so Im inserting data with

indexEmailData(self, data, _id):


def get_conn():
#change to thrift
iconn = ES(['host1:9500', (6total hosts)],
timeout=4, max_retries=30, bulk_size=50)

return iconn

class ESload():

def __init__(self):
    self._indexName = 'archive'
    self.iconn = get_conn()
    self.bulkSize = 0

def createIndex(self):

    #_indexType = 'email'
    status = None
    try:
        status = self.iconn.status(self._indexName)
    except:
        #put the shards/replicas into creation... ?
        #iconn.create_index(_indexName)

        mappingsEmail = {

                        u'inbox': {'type': u'string'},
                        u'from':{'type': u'string'},
                        u'subject':{'type': u'string'},
                       u'date':{'type': u'date'},
                        u'messageID':{'type': u'string', 'index':

'not_analyzed'},
u'attachments':{'type': u'string'},
u'size':{'type': u'long', 'index':
'not_analyzed'},
u'body': {'type': u'string'}

        }

        mappingsEnv = {

                 u'sender': {'type': u'string'},
                 u'recipient': {'type': u'string'},
                 u'ip': {'type': 'ip'}

        }

        mappingsSource = {
                        '_source': {'compress': 'true'}
        }

        status = self.iconn.put_mapping("email",

{'properties':mappingsEmail}, self._indexName)
self.iconn.put_mapping("email", mappingsSource,
self._indexName)

        self.iconn.put_mapping("envelope",

{'properties':mappingsEnv}, self._indexName)
self.iconn.put_mapping("envelope", mappingsSource,
self._indexName)

def indexEmailData(self, data, _id):

    self.iconn.index(data, self._indexName, "email", _id)
    #self.iconn.refresh([self._indexName])
    self.bulkSize += 1

    if self.bulkSize == 350:
        #self.iconn.refresh([self._indexName])
        self.bulkSize = 0
        print self.bulkSize

and one more error from one node:

[2011-04-19 04:01:19,233][WARN ][transport.netty ]
[Sunstreak] Exception caught on netty layer [[id: 0x498ab963, /IP:
48360 => /ip:9300]]
java.lang.OutOfMemoryError: Java heap space

On Apr 19, 3:55 am, peterob peterob...@gmail.com wrote:

Hello,

so i connected into each node in cluster over jmx... Im inserting with
one client (process), while im looking at jmx over jvirtual... during
data insertion into cluster (not bulk, avg size of one insert ~40K)
the CPU usage by jvm on all nodes is about ~3%. After a while during
insertion suddenly CPU usage on three nodes of the cluster reach ~70%

  • now the error1, attached below, shows up on client side. It tooks
    about 2-3min and then everything works fine.. This behavior is
    repeating in the same scenario in "periodical" intervals.

Does it mean that cluster is badly balanced? I have 6 nodes ,
replication = 2 , number of shards = 20. What is usual scenario how to
tune it?

Thanks for any ideas.

(error1)
Traceback (most recent call last):
File "/usr/lib/python2.6/site-packages/pyes-0.15.0-py2.6.egg/pyes/
connection.py", line 166, in _client_call
return getattr(conn.client, attr)(*args, **kwargs)
File "/usr/lib/python2.6/site-packages/pyes-0.15.0-py2.6.egg/pyes/
pyesthrift/Rest.py", line 40, in execute
return self.recv_execute()
File "/usr/lib/python2.6/site-packages/pyes-0.15.0-py2.6.egg/pyes/
pyesthrift/Rest.py", line 51, in recv_execute
(fname, mtype, rseqid) = self._iprot.readMessageBegin()
File "/usr/lib/python2.6/site-packages/thrift/protocol/
TBinaryProtocol.py", line 126, in readMessageBegin
sz = self.readI32()
File "/usr/lib/python2.6/site-packages/thrift/protocol/
TBinaryProtocol.py", line 203, in readI32
buff = self.trans.readAll(4)
File "/usr/lib/python2.6/site-packages/thrift/transport/
TTransport.py", line 58, in readAll
chunk = self.read(sz-have)
File "/usr/lib/python2.6/site-packages/thrift/transport/
TTransport.py", line 160, in read
self.__rbuf = StringIO(self.__trans.read(max(sz,
self.__rbuf_size)))
File "/usr/lib/python2.6/site-packages/thrift/transport/TSocket.py",
line 94, in read
buff = self.handle.recv(sz)
timeout: timed out

Only one node from cluster (6nodes) has in its log info about
compactions....
[2011-04-19 03:29:25,634][INFO ][monitor.jvm ] [Ox] [gc]
[ConcurrentMarkSweep][80] took [5.5s]/[2.3s], reclaimed [-6.3E7b],
leaving [1.6gb] used, max [2.1gb]

On Apr 18, 11:56 pm, peterob peterob...@gmail.com wrote:

Hello,

I installed ES on 6 servers (4cores, 4G phy ram). ES is getting 2G
java heap (-xmx2g).

Im using pyes client, with connection pooling inserting continuous
stream of data (~ 50KB per insert). Client use 6 threads for
insertion. Im using Thrift. I have index with replication 2 and 20
shards. What happened is after few minutes (4min) my client starting
reporting:

I tried the bulk insertion + normal insert (the same results). I
tried then send 30x 50Kb of data for client with 6 threads and
sleep(1sec). Doesnt help that much.

I heard that ppl with Solandra kind of same HW config go for 4Mbit
stream of insert.

What do you think about my results? Is the bottleneck jvm heap? If yes
so is it normal i can index about only 5 doc /per sec? How can i find
out the right size of cluster / stream size?

I cant see any related info on the webpage so thanks for all your
advices.

Errors are below + piece of code im using for insertion.
(multithreading in client is made by celery with moneky_patch())

Thanks

Client error: timed out
Traceback (most recent call last):
File "/usr/lib/python2.6/site-packages/pyes-0.15.0-py2.6.egg/pyes/
connection.py", line 166, in _client_call
return getattr(conn.client, attr)(*args, **kwargs)
File "/usr/lib/python2.6/site-packages/pyes-0.15.0-py2.6.egg/pyes/
pyesthrift/Rest.py", line 40, in execute
return self.recv_execute()
File "/usr/lib/python2.6/site-packages/pyes-0.15.0-py2.6.egg/pyes/
pyesthrift/Rest.py", line 51, in recv_execute
(fname, mtype, rseqid) = self._iprot.readMessageBegin()
File "/usr/lib/python2.6/site-packages/thrift/protocol/
TBinaryProtocol.py", line 126, in readMessageBegin
sz = self.readI32()
File "/usr/lib/python2.6/site-packages/thrift/protocol/
TBinaryProtocol.py", line 203, in readI32
buff = self.trans.readAll(4)
File "/usr/lib/python2.6/site-packages/thrift/transport/
TTransport.py", line 58, in readAll
chunk = self.read(sz-have)
File "/usr/lib/python2.6/site-packages/thrift/transport/
TTransport.py", line 160, in read
self.__rbuf = StringIO(self.__trans.read(max(sz,
self.__rbuf_size)))
File "/usr/lib/python2.6/site-packages/thrift/transport/TSocket.py",
line 94, in read
buff = self.handle.recv(sz)
File "/usr/lib/python2.6/site-packages/eventlet-0.9.15-py2.6.egg/
eventlet/greenio.py", line 238, in recv
timeout_exc=socket.timeout("timed out"))
File "/usr/lib/python2.6/site-packages/eventlet-0.9.15-py2.6.egg/
eventlet/hubs/init.py", line 121, in trampoline
return hub.switch()
File "/usr/lib/python2.6/site-packages/eventlet-0.9.15-py2.6.egg/
eventlet/hubs/hub.py", line 177, in switch
return self.greenlet.switch()
timeout: timed out

On the servers there is error message in logs which occurs in log
about every second.

[2011-04-18 23:27:17,265][DEBUG][action.admin.indices.flush] [Skunge
the Laxidazian Troll] [archive][4], node[V2LD8E_GRr-KC9Ve2IoZxw], [R],
s[STARTED]: Failed to execute
[org.elasticsearch.action.admin.indices.flush.FlushRequest@7fab2a74]
org.elasticsearch.transport.RemoteTransportException: [Obituary][inet[/
IP:9300]][indices/flush/shard]
Caused by:
org.elasticsearch.index.engine.FlushNotAllowedEngineException:
[archive][4] Already flushing...
at
org.elasticsearch.index.engine.robin.RobinEngine.flush(RobinEngine.java:
586)
at
org.elasticsearch.index.shard.service.InternalIndexShard.flush(InternalInde xShard.java:
402)
at
org.elasticsearch.action.admin.indices.flush.TransportFlushAction.shardOper ation(TransportFlushAction.java:
114)
at
org.elasticsearch.action.admin.indices.flush.TransportFlushAction.shardOper ation(TransportFlushAction.java:
50)
at
org.elasticsearch.action.support.broadcast.TransportBroadcastOperationActio n
$ShardTransportHandler.messageReceived(TransportBroadcastOperationAction.ja va:
380)
at
org.elasticsearch.action.support.broadcast.TransportBroadcastOperationActio n
$ShardTransportHandler.messageReceived(TransportBroadcastOperationAction.ja va:
369)
at org.elasticsearch.transport.netty.MessageChannelHandler
$RequestHandler.run(MessageChannelHandler.java:238)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)


code

so Im inserting data with

indexEmailData(self, data, _id):


def get_conn():
#change to thrift
iconn = ES(['host1:9500', (6total hosts)],
timeout=4, max_retries=30, bulk_size=50)

return iconn

class ESload():

def __init__(self):
    self._indexName = 'archive'
    self.iconn = get_conn()
    self.bulkSize = 0
def createIndex(self):
    #_indexType = 'email'
    status = None
    try:
        status = self.iconn.status(self._indexName)
    except:
        #put the shards/replicas into creation... ?
        #iconn.create_index(_indexName)
        mappingsEmail = {
                        u'inbox': {'type': u'string'},
                        u'from':{'type': u'string'},
                        u'subject':{'type': u'string'},
                       u'date':{'type': u'date'},
                        u'messageID':{'type': u'string', 'index':

'not_analyzed'},
u'attachments':{'type': u'string'},
u'size':{'type': u'long', 'index':
'not_analyzed'},
u'body': {'type': u'string'}

        }
        mappingsEnv = {
                 u'sender': {'type': u'string'},
                 u'recipient': {'type': u'string'},
                 u'ip': {'type': 'ip'}
        }
        mappingsSource = {
                        '_source': {'compress': 'true'}
        }
        status = self.iconn.put_mapping("email",

{'properties':mappingsEmail}, self._indexName)
self.iconn.put_mapping("email", mappingsSource,
self._indexName)

        self.iconn.put_mapping("envelope",

{'properties':mappingsEnv}, self._indexName)
self.iconn.put_mapping("envelope", mappingsSource,
self._indexName)

def indexEmailData(self, data, _id):
    self.iconn.index(data, self._indexName, "email", _id)
    #self.iconn.refresh([self._indexName])
    self.bulkSize += 1
    if self.bulkSize == 350:
        #self.iconn.refresh([self._indexName])
        self.bulkSize = 0
        print self.bulkSize