Indexing Progressively Slows on Thrift Input Stream

Hi,

I currently have ES installed on a Xen cluster of 14 instances:

400 GB Disk space
2 Intel cores
4 GB RAM

All instances are on the same network segment.

I am storing documents with mapping identical to twitter with a
collator and analyzer operating on the text field.

I am running 14 python threads each writing to one of the instances on
port 9500 using the thrift transport.

For each thrift transport open/close transaction I am passing 350
documents.

For 56 shards and no replication on the index.

When I begin the build of the index I am writing ~4500 documents/sec
but this performance degrades hence I am writing 5M documents at an
average rate of 1400 documents/sec and 10M at < 1000 documents per
second.

When I double the number of shards I get errors in writing.

Any suggestions?

Regards,

Wolf

index setup:

curl -XPUT 'http://192.168.0.129:9200/twitter' -d
'{ "index" : {
"numberOfShards" : 42,
"numberOfReplicas" : 0,
"analysis" : {
"analyzer" : {
"collation" : {
"tokenizer" : "keyword",
"filter" : ["myCollator"]
},
"my_analyzer" : {
"type" : "igo"
}
},
"filter" : {
"myCollator" : {
"type" : "icu_collation",
"language" : "ja"
}
}
}
}
}'

curl -XPUT 'http://192.168.0.129:9200/twitter/_mapping' -d
@tweet_mapping

python class for writing tweet docs

class DocWriter:
def init(self, host, data_file, initial_index):
self.index = initial_index
self.uri = "/twitter/tweet/"
self.host = "192.168.0."+str(host)
self.data_file = data_file
self.socket = TSocket.TSocket("192.168.0.190", 9500)
self.transport = TTransport.TBufferedTransport(self.socket)
self.protocol = TBinaryProtocolAccelerated(self.transport)
self.client = Rest.Client(self.protocol)
def write_file_to_es(self):
with open(self.data_file, 'r') as f:
self.transport.open()
line = f.readline()
while 0 < len(line):
self.index = self.index + 1
line = json.loads(line.rstrip("\n"), object_hook=self.fix_obj)
request = RestRequest(method=1, uri=self.uri + str(self.index),
headers={}, body=json.dumps(line))
response = self.client.execute(request)
response = json.loads(response.body)
try: response['ok']
except NameError:
print response
self.transport.close()
f.close
return self.index
line = f.readline()
f.closed
self.transport.close()
return self.index
def fix_obj(self, obj):
if 'retweeted_status' in obj:
if 'retweet_count' in obj['retweeted_status']:

print(obj['retweeted_status']['retweet_count'])

obj['retweeted_status']['retweet_count'] =

str(obj['retweeted_status']['retweet_count'])
obj['retweeted_status']=obj['retweeted_status']
elif 'retweet_count' in obj:
obj['retweet_count'] = str(obj['retweet_count'])
return obj

Few things that might affect that:

  1. Have you made sure you increased the file descriptor limit?
  2. If you open and close a connection every 350 docs, from 15 processes, all working against a single server, you create many sockets. When you close a socket, it does not really gets closed, it ends up in TIME_WAIT state. And then, if you load the system enough, the OS will throttle opening sockets until others have been recycled. I suggest you use the same connection through the indexing process, and, work against several hosts.
  3. It might be a problem in thrift, you might want to consider using HTTP just to check. Use pyes if you use python, its a good client for elasticsearch.

Also, did you see exceptions in elasticsearch logs, or in your responses from it?

We can also talk about options of how to improve perf when it comes to elasticsearch, but I am not sure you hit it yet. Using the bulk API will help the most in your case.
On Tuesday, March 22, 2011 at 4:39 AM, Wolf wrote:
Hi,

I currently have ES installed on a Xen cluster of 14 instances:

400 GB Disk space
2 Intel cores
4 GB RAM

All instances are on the same network segment.

I am storing documents with mapping identical to twitter with a
collator and analyzer operating on the text field.

I am running 14 python threads each writing to one of the instances on
port 9500 using the thrift transport.

For each thrift transport open/close transaction I am passing 350
documents.

For 56 shards and no replication on the index.

When I begin the build of the index I am writing ~4500 documents/sec
but this performance degrades hence I am writing 5M documents at an
average rate of 1400 documents/sec and 10M at < 1000 documents per
second.

When I double the number of shards I get errors in writing.

Any suggestions?

Regards,

Wolf

index setup:

curl -XPUT 'http://192.168.0.129:9200/twitter' -d
'{ "index" : {
"numberOfShards" : 42,
"numberOfReplicas" : 0,
"analysis" : {
"analyzer" : {
"collation" : {
"tokenizer" : "keyword",
"filter" : ["myCollator"]
},
"my_analyzer" : {
"type" : "igo"
}
},
"filter" : {
"myCollator" : {
"type" : "icu_collation",
"language" : "ja"
}
}
}
}
}'

curl -XPUT 'http://192.168.0.129:9200/twitter/_mapping' -d
@tweet_mapping

python class for writing tweet docs

class DocWriter:
def init(self, host, data_file, initial_index):
self.index = initial_index
self.uri = "/twitter/tweet/"
self.host = "192.168.0."+str(host)
self.data_file = data_file
self.socket = TSocket.TSocket("192.168.0.190", 9500)
self.transport = TTransport.TBufferedTransport(self.socket)
self.protocol = TBinaryProtocolAccelerated(self.transport)
self.client = Rest.Client(self.protocol)
def write_file_to_es(self):
with open(self.data_file, 'r') as f:
self.transport.open()
line = f.readline()
while 0 < len(line):
self.index = self.index + 1
line = json.loads(line.rstrip("\n"), object_hook=self.fix_obj)
request = RestRequest(method=1, uri=self.uri + str(self.index),
headers={}, body=json.dumps(line))
response = self.client.execute(request)
response = json.loads(response.body)
try: response['ok']
except NameError:
print response
self.transport.close()
f.close
return self.index
line = f.readline()
f.closed
self.transport.close()
return self.index
def fix_obj(self, obj):
if 'retweeted_status' in obj:
if 'retweet_count' in obj['retweeted_status']:

print(obj['retweeted_status']['retweet_count'])

obj['retweeted_status']['retweet_count'] =
str(obj['retweeted_status']['retweet_count'])
obj['retweeted_status']=obj['retweeted_status']
elif 'retweet_count' in obj:
obj['retweet_count'] = str(obj['retweet_count'])
return obj

Some tips that can help you:

  • using pyes and having thrift you have automatic balancing on es server and a very stable thrift interface threadsafe (pyes share the thrift implementation with the pycassa one, with some tricks for es)

  • reading files with readline is very poor performance. Use a bufferreader that wrap the readline.

  • do a bulk indexing setting creating record PUT improve performances. The bulk size must be tested to have the "best size": try a bulk inserts with 200, 300, 400, 500 depends of the size of record

  • file descriptor and socket problems are very nasty as Shay said. I saw that for every connection netty use a file description, so increment the limit. Also in linux you may set the TIME_WAIT to 0s, on windows the mininum value is 4s (search on google the regedit key to change)

  • reuse the connection. Pyes tries to do this automatically.

  • in python socket operations are Gil-Locked, to improve performance use eventlet

If you need some other helps, I'm on IRC irc.freenode.net elasticsearch.

Hi,
Alberto

Il giorno 22/mar/2011, alle ore 09.24, Shay Banon ha scritto:

Few things that might affect that:

  1. Have you made sure you increased the file descriptor limit?
  2. If you open and close a connection every 350 docs, from 15 processes, all working against a single server, you create many sockets. When you close a socket, it does not really gets closed, it ends up in TIME_WAIT state. And then, if you load the system enough, the OS will throttle opening sockets until others have been recycled. I suggest you use the same connection through the indexing process, and, work against several hosts.
  3. It might be a problem in thrift, you might want to consider using HTTP just to check. Use pyes if you use python, its a good client for elasticsearch.

Also, did you see exceptions in elasticsearch logs, or in your responses from it?

We can also talk about options of how to improve perf when it comes to elasticsearch, but I am not sure you hit it yet. Using the bulk API will help the most in your case.
On Tuesday, March 22, 2011 at 4:39 AM, Wolf wrote:

Hi,

I currently have ES installed on a Xen cluster of 14 instances:

400 GB Disk space
2 Intel cores
4 GB RAM

All instances are on the same network segment.

I am storing documents with mapping identical to twitter with a
collator and analyzer operating on the text field.

I am running 14 python threads each writing to one of the instances on
port 9500 using the thrift transport.

For each thrift transport open/close transaction I am passing 350
documents.

For 56 shards and no replication on the index.

When I begin the build of the index I am writing ~4500 documents/sec
but this performance degrades hence I am writing 5M documents at an
average rate of 1400 documents/sec and 10M at < 1000 documents per
second.

When I double the number of shards I get errors in writing.

Any suggestions?

Regards,

Wolf

index setup:

curl -XPUT 'http://192.168.0.129:9200/twitter' -d
'{ "index" : {
"numberOfShards" : 42,
"numberOfReplicas" : 0,
"analysis" : {
"analyzer" : {
"collation" : {
"tokenizer" : "keyword",
""filter" : ["myCollator"]
}},
"my_analyzer" : {
""type" : "igo"
}
},
""filter" : {
"myCollator" : {
"type" : "icu_collation",
""language" : "ja"
}
}
}
}
}'

curl -XPUT 'http://192.168.0.129:9200/twitter/_mapping' -d
@tweet_mapping

python class for writing tweet docs

class DocWriter:
def init(self, host, data_file, initial_index):
self.index = initial_index
self.uri = "/twitter/tweet/"
self.host = "192.168.0."+str(host)
self.data_file = data_file
self.socket = TSocket.TSocket("192.168.0.190", 9500)
self.transport = TTransport.TBufferedTransport(self.socket)
self.protocol = TBinaryProtocolAccelerated(self.transport)
self.client = Rest.Client(self.protocol)
def write_file_to_es(self):
with open(self.data_file, 'r') as f:
self.transport.open()
line = f.readline()
while 0 < len(line):
self.index = self.index + 1
line = json.loads(line.rstrip("\n"), object_hook=self.fix_obj)
request ==3D RestRequest(method=1, uri=self.uri + str(self.index),
headers={}, body=json.dumps(line))
response = self.client.execute(request)
response = json.loads(response.body)
try: response[['ok']
except NameError:
print response
self.transport.close()
f.close
return self.index
line = f.readline()
f.closed
self.transport.close()
return self.index
def fix_obj(self, obj):
if 'retweeted_status' in obj:
if 'retweet_count' in obj['retweeted_status']:

print(obj['retweeted_status']['retweet_count'])

obj['retweeted_status']['retweet_count'] =
str(obj['retweeted_status']['retweet_count'])
obj['retweeted_status']=obj['retweeted_status']
elif 'retweet_count' in obj:
obj['retweet_count'] = str(obj['retweet_count'])
return obj

On Tue, Mar 22, 2011 at 2:43 PM, Alberto Paro alberto.paro@gmail.com wrote:

  • file descriptor and socket problems are very nasty as Shay said. I saw
    that for every connection netty use a file description, so increment the

This isn't about Netty, it's just how BSD sockets work. One
connection = one file descriptor.

limit. Also in linux you may set the TIME_WAIT to 0s

Don't do this. There's a reason why TIME_WAIT exists. Generally
don't tune any TCP knob unless you really know what you're doing.
Note: if you haven't read the code of the TCP implementation of the
Linux kernel, chances are high you don't know what you're doing.
Don't listen to blog posts and others that recommend tuning up or down
all the TCP parameters to extreme values, they generally will make
your problem worse, even if you don't realize it immediately.

This is the type of process you should go through before making any
change to TCP knobs on Linux:

--
Benoit "tsuna" Sigoure
Software Engineer @ www.StumbleUpon.com