Hello,
I installed ES on 6 servers (4cores, 4G phy ram). ES is getting 2G
java heap (-xmx2g).
Im using pyes client, with connection pooling inserting continuous
stream of data (~ 50KB per insert). Client use 6 threads for
insertion. Im using Thrift. I have index with replication 2 and 20
shards. What happened is after few minutes (4min) my client starting
reporting:
I tried the bulk insertion + normal insert (the same results). I
tried then send 30x 50Kb of data for client with 6 threads and
sleep(1sec). Doesnt help that much.
I heard that ppl with Solandra kind of same HW config go for 4Mbit
stream of insert.
What do you think about my results? Is the bottleneck jvm heap? If yes
so is it normal i can index about only 5 doc /per sec? How can i find
out the right size of cluster / stream size?
I cant see any related info on the webpage so thanks for all your
advices.
Errors are below + piece of code im using for insertion.
(multithreading in client is made by celery with moneky_patch())
Thanks
Client error: timed out
Traceback (most recent call last):
File "/usr/lib/python2.6/site-packages/pyes-0.15.0-py2.6.egg/pyes/
connection.py", line 166, in _client_call
return getattr(conn.client, attr)(*args, **kwargs)
File "/usr/lib/python2.6/site-packages/pyes-0.15.0-py2.6.egg/pyes/
pyesthrift/Rest.py", line 40, in execute
return self.recv_execute()
File "/usr/lib/python2.6/site-packages/pyes-0.15.0-py2.6.egg/pyes/
pyesthrift/Rest.py", line 51, in recv_execute
(fname, mtype, rseqid) = self._iprot.readMessageBegin()
File "/usr/lib/python2.6/site-packages/thrift/protocol/
TBinaryProtocol.py", line 126, in readMessageBegin
sz = self.readI32()
File "/usr/lib/python2.6/site-packages/thrift/protocol/
TBinaryProtocol.py", line 203, in readI32
buff = self.trans.readAll(4)
File "/usr/lib/python2.6/site-packages/thrift/transport/
TTransport.py", line 58, in readAll
chunk = self.read(sz-have)
File "/usr/lib/python2.6/site-packages/thrift/transport/
TTransport.py", line 160, in read
self.__rbuf = StringIO(self.__trans.read(max(sz,
self.__rbuf_size)))
File "/usr/lib/python2.6/site-packages/thrift/transport/TSocket.py",
line 94, in read
buff = self.handle.recv(sz)
File "/usr/lib/python2.6/site-packages/eventlet-0.9.15-py2.6.egg/
eventlet/greenio.py", line 238, in recv
timeout_exc=socket.timeout("timed out"))
File "/usr/lib/python2.6/site-packages/eventlet-0.9.15-py2.6.egg/
eventlet/hubs/init.py", line 121, in trampoline
return hub.switch()
File "/usr/lib/python2.6/site-packages/eventlet-0.9.15-py2.6.egg/
eventlet/hubs/hub.py", line 177, in switch
return self.greenlet.switch()
timeout: timed out
On the servers there is error message in logs which occurs in log
about every second.
[2011-04-18 23:27:17,265][DEBUG][action.admin.indices.flush] [Skunge
the Laxidazian Troll] [archive][4], node[V2LD8E_GRr-KC9Ve2IoZxw], [R],
s[STARTED]: Failed to execute
[org.elasticsearch.action.admin.indices.flush.FlushRequest@7fab2a74]
org.elasticsearch.transport.RemoteTransportException: [Obituary][inet[/
IP:9300]][indices/flush/shard]
Caused by:
org.elasticsearch.index.engine.FlushNotAllowedEngineException:
[archive][4] Already flushing...
at
org.elasticsearch.index.engine.robin.RobinEngine.flush(RobinEngine.java:
586)
at
org.elasticsearch.index.shard.service.InternalIndexShard.flush(InternalIndexShard.java:
402)
at
org.elasticsearch.action.admin.indices.flush.TransportFlushAction.shardOperation(TransportFlushAction.java:
114)
at
org.elasticsearch.action.admin.indices.flush.TransportFlushAction.shardOperation(TransportFlushAction.java:
50)
at
org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction
$ShardTransportHandler.messageReceived(TransportBroadcastOperationAction.java:
380)
at
org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction
$ShardTransportHandler.messageReceived(TransportBroadcastOperationAction.java:
369)
at org.elasticsearch.transport.netty.MessageChannelHandler
$RequestHandler.run(MessageChannelHandler.java:238)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
code
so Im inserting data with
indexEmailData(self, data, _id):
def get_conn():
#change to thrift
iconn = ES(['host1:9500', (6total hosts)],
timeout=4, max_retries=30, bulk_size=50)
return iconn
class ESload():
def __init__(self):
self._indexName = 'archive'
self.iconn = get_conn()
self.bulkSize = 0
def createIndex(self):
#_indexType = 'email'
status = None
try:
status = self.iconn.status(self._indexName)
except:
#put the shards/replicas into creation... ?
#iconn.create_index(_indexName)
mappingsEmail = {
u'inbox': {'type': u'string'},
u'from':{'type': u'string'},
u'subject':{'type': u'string'},
u'date':{'type': u'date'},
u'messageID':{'type': u'string', 'index':
'not_analyzed'},
u'attachments':{'type': u'string'},
u'size':{'type': u'long', 'index':
'not_analyzed'},
u'body': {'type': u'string'}
}
mappingsEnv = {
u'sender': {'type': u'string'},
u'recipient': {'type': u'string'},
u'ip': {'type': 'ip'}
}
mappingsSource = {
'_source': {'compress': 'true'}
}
status = self.iconn.put_mapping("email",
{'properties':mappingsEmail}, self._indexName)
self.iconn.put_mapping("email", mappingsSource,
self._indexName)
self.iconn.put_mapping("envelope",
{'properties':mappingsEnv}, self._indexName)
self.iconn.put_mapping("envelope", mappingsSource,
self._indexName)
def indexEmailData(self, data, _id):
self.iconn.index(data, self._indexName, "email", _id)
#self.iconn.refresh([self._indexName])
self.bulkSize += 1
if self.bulkSize == 350:
#self.iconn.refresh([self._indexName])
self.bulkSize = 0
print self.bulkSize