Pyes bulk loading no server available error


(Wolf-2) #1

Hi,

First of all I cannot find any documentation other than python ES help
pages or reading source code to setup bulk loading with pyes. However
I am now running a demonstration with a limited capacity before errors
occur. My report is below.

I am using the pyes module bulk loading application with elasticsearch
0.15.2 . I am attempting to load a 6G documents of twitter like
content onto a 14 instance Xen virtual cluster. Each instance has 2
processors, 4GB of data and 50GB of storage. I am using the ES
facility constructor with all with the array of Xen instances, 10
retries and bulk_size runs of 400, 1000, 2000 or 10000.

My index has 28 shards with no replicas. I have a mapping setup with a
text field being indexed.

On a local server I have increased my file descriptor limit to 65535
and used 4 shards but I have not been able to eliminate the error.

I am not able to load more 2G of documents without a server timeout
error.

During operation I am able to load more than 2K of documents per
second but I need to sustain the input without errors.

What should I do?

Below is a listing of my code and the error message:

import json
import datetime
import time
import thread
import os
from thrift import Thrift
from thrift.transport import TTransport
from thrift.transport import TSocket
from thrift.protocol.TBinaryProtocol import TBinaryProtocolAccelerated
from pyes import *
from gzip import GzipFile

class DocWriter:
def init(self, hosts):
self.key_id = 0
self.port = '9500'
self.server_properties = { "base": "192.168.0.", "port": "9500" }
self.index = { "name": "twitter" }
self.type = { "name": "tweet" }
self.hosts = map(self.set_host, hosts)
self.property_files = { "base": "/root/F7setup", "index":
"index_settings", "mapping": "tweet_mapping" }
self.data_path = '/root/F7setup/Cluster/data'
self.connect = self.connection()
self.set_files()
try:
self.set_index()
except Exception:
print "Index: " + self.index['name'] + " already exists"
else:
print "Index: " + self.index['name'] + " is set"
self.set_mapping()
def set_host(self, host):
return self.server_properties['base'] + host + ":" +
self.server_properties['port']
def connection(self):
return ES(self.hosts, bulk_size=10000, max_retries=10)

def delete_index(self):
self.connect.delete_index(self.index['name'])
def set_index(self):
with open(self.property_files['base'] + "/" +
self.property_files['index'], 'r') as f:
index=json.loads(f.read())
self.index=index=index['index']
self.connect.create_index(index['name'], index['properties'])
def set_mapping(self):
with open(self.property_files['base']
+"/"+self.property_files['mapping'], 'r') as f:
self.type=json.loads(f.read())
self.type=self.type['type']
self.connect.put_mapping(self.type['name'], self.type,
self.index['name'])
def set_files(self):
self.data_files = self.get_files()
def get_files(self):
return map(lambda x: self.data_path + "/" + x,
os.listdir(self.data_path))
def write_files_to_es(self, file_count_offset=0):
start = datetime.datetime.now()
self.set_files()
del self.data_files[0:file_count_offset]
for file in self.data_files:
self.write_block_to_es(file)
print "document count: " + str(self.key_id)
print file + " written to es"
finish = datetime.datetime.now()
print "time span: ", finish - start
print " duration: ", finish - start
def write_file_to_es(self, file):
with GzipFile(file, 'r') as f:
line = f.readline()
while 0 < len(line):
self.key_id = self.key_id + 1
line = json.loads(line, object_hook=self.fix_obj)
response = self.connect.index(line, self.index['name'],
self.type['name'], self.key_id)
try: response['ok']
except NameError:
print "file name: ", file, " count: ", self.key_id
print response
f.close
return self.key_id
else:
line = f.readline()
f.closed
return self.key_id
def write_block_to_es(self, file):
with GzipFile(file, 'r') as f:
line = f.readline()
while 0 < len(line):
self.key_id = self.key_id + 1
line = json.loads(line, object_hook=self.fix_obj)
response = self.connect.index(line, self.index['name'],
self.type['name'], self.key_id, bulk=True)
line = f.readline()

f.closed
return self.key_id
def write_file_line_to_es(self, file, line_no):
line_cnt = 1
with GzipFile(file, 'r') as f:
line = f.readline()
while 0 < len(line):
self.key_id = self.key_id + 1
if line_cnt == line_no:
print line
line = json.loads(line, object_hook=self.fix_obj)
response = self.connect.index(line, self.index['name'],
self.type['name'], self.key_id)
break
line_cnt = line_cnt + 1
line = f.readline()
f.closed
return self.key_id
def fix_obj(self, obj):
if 'in_reply_to_user_id' in obj:
obj['in_reply_to_user_id'] = str(obj['in_reply_to_user_id'])
if 'retweeted_status' in obj:
if 'retweet_count' in obj['retweeted_status']:

print(obj['retweeted_status']['retweet_count'])

obj['retweeted_status']['retweet_count'] =

str(obj['retweeted_status']['retweet_count'])
obj['retweeted_status']=obj['retweeted_status']
elif 'retweet_count' in obj:
obj['retweet_count'] = str(obj['retweet_count'])
return obj
def iterate(self, loop_count, delay=0):
time.sleep(delay)

while 0 < loop_count:
loop_count = loop_count - 1
self.write_file_to_es()
return self.index

error message:

Traceback (most recent call last):
File "", line 1, in
File "doc_writer.py", line 58, in write_files_to_es
self.write_block_to_es(file)
File "doc_writer.py", line 87, in write_block_to_es
response = self.connect.index(line, self.index['name'],
self.type['name'], self.key_id, bulk=True)
File "/usr/lib/python2.7/site-packages/pyes-0.14.1-py2.7.egg/pyes/
es.py", line 519, in index
self.flush_bulk()
File "/usr/lib/python2.7/site-packages/pyes-0.14.1-py2.7.egg/pyes/
es.py", line 543, in flush_bulk
self.force_bulk()
File "/usr/lib/python2.7/site-packages/pyes-0.14.1-py2.7.egg/pyes/
es.py", line 551, in force_bulk
self._send_request("POST", "/_bulk", self.bulk_data.getvalue())
File "/usr/lib/python2.7/site-packages/pyes-0.14.1-py2.7.egg/pyes/
es.py", line 197, in _send_request
response = self.connection.execute(request)
File "/usr/lib/python2.7/site-packages/pyes-0.14.1-py2.7.egg/pyes/
connection.py", line 174, in _client_call
raise NoServerAvailable
pyes.exceptions.NoServerAvailable


(Clinton Gormley) #2

Hi Wolf

I wonder if you're running into swap problems, or your garbage
collections are slow because some of the JVM is being swapped out?

Try turning swap off, it may help.

Also, see this blog post about what's in master:

http://www.elasticsearch.org/blog/2011/03/23/update-settings.html

hth

clint


(Wolf-2) #3

Hi I am getting better results by extending the timeout limit to 10
sec but there are probably other issues to address.

Regards,

Wolf

On Mar 26, 2:58 pm, Wolf wolfk...@gmail.com wrote:

Hi,

First of all I cannot find any documentation other than python ES help
pages or reading source code to setup bulk loading with pyes. However
I am now running a demonstration with a limited capacity before errors
occur. My report is below.

I am using the pyes module bulk loading application with elasticsearch
0.15.2 . I am attempting to load a 6G documents of twitter like
content onto a 14 instance Xen virtual cluster. Each instance has 2
processors, 4GB of data and 50GB of storage. I am using the ES
facility constructor with all with the array of Xen instances, 10
retries and bulk_size runs of 400, 1000, 2000 or 10000.

My index has 28 shards with no replicas. I have a mapping setup with a
text field being indexed.

On a local server I have increased my file descriptor limit to 65535
and used 4 shards but I have not been able to eliminate the error.

I am not able to load more 2G of documents without a server timeout
error.

During operation I am able to load more than 2K of documents per
second but I need to sustain the input without errors.

What should I do?

Below is a listing of my code and the error message:

import json
import datetime
import time
import thread
import os
from thrift import Thrift
from thrift.transport import TTransport
from thrift.transport import TSocket
from thrift.protocol.TBinaryProtocol import TBinaryProtocolAccelerated
from pyes import *
from gzip import GzipFile

class DocWriter:
def init(self, hosts):
self.key_id = 0
self.port = '9500'
self.server_properties = { "base": "192.168.0.", "port": "9500" }
self.index = { "name": "twitter" }
self.type = { "name": "tweet" }
self.hosts = map(self.set_host, hosts)
self.property_files = { "base": "/root/F7setup", "index":
"index_settings", "mapping": "tweet_mapping" }
self.data_path = '/root/F7setup/Cluster/data'
self.connect = self.connection()
self.set_files()
try:
self.set_index()
except Exception:
print "Index: " + self.index['name'] + " already exists"
else:
print "Index: " + self.index['name'] + " is set"
self.set_mapping()
def set_host(self, host):
return self.server_properties['base'] + host + ":" +
self.server_properties['port']
def connection(self):
return ES(self.hosts, bulk_size=10000, max_retries=10)

def delete_index(self):
self.connect.delete_index(self.index['name'])
def set_index(self):
with open(self.property_files['base'] + "/" +
self.property_files['index'], 'r') as f:
index=json.loads(f.read())
self.index=index=index['index']
self.connect.create_index(index['name'], index['properties'])
def set_mapping(self):
with open(self.property_files['base']
+"/"+self.property_files['mapping'], 'r') as f:
self.type=json.loads(f.read())
self.type=self.type['type']
self.connect.put_mapping(self.type['name'], self.type,
self.index['name'])
def set_files(self):
self.data_files = self.get_files()
def get_files(self):
return map(lambda x: self.data_path + "/" + x,
os.listdir(self.data_path))
def write_files_to_es(self, file_count_offset=0):
start = datetime.datetime.now()
self.set_files()
del self.data_files[0:file_count_offset]
for file in self.data_files:
self.write_block_to_es(file)
print "document count: " + str(self.key_id)
print file + " written to es"
finish = datetime.datetime.now()
print "time span: ", finish - start
print " duration: ", finish - start
def write_file_to_es(self, file):
with GzipFile(file, 'r') as f:
line = f.readline()
while 0 < len(line):
self.key_id = self.key_id + 1
line = json.loads(line, object_hook=self.fix_obj)
response = self.connect.index(line, self.index['name'],
self.type['name'], self.key_id)
try: response['ok']
except NameError:
print "file name: ", file, " count: ", self.key_id
print response
f.close
return self.key_id
else:
line = f.readline()
f.closed
return self.key_id
def write_block_to_es(self, file):
with GzipFile(file, 'r') as f:
line = f.readline()
while 0 < len(line):
self.key_id = self.key_id + 1
line = json.loads(line, object_hook=self.fix_obj)
response = self.connect.index(line, self.index['name'],
self.type['name'], self.key_id, bulk=True)
line = f.readline()

f.closed
return self.key_id
def write_file_line_to_es(self, file, line_no):
line_cnt = 1
with GzipFile(file, 'r') as f:
line = f.readline()
while 0 < len(line):
self.key_id = self.key_id + 1
if line_cnt == line_no:
print line
line = json.loads(line, object_hook=self.fix_obj)
response = self.connect.index(line, self.index['name'],
self.type['name'], self.key_id)
break
line_cnt = line_cnt + 1
line = f.readline()
f.closed
return self.key_id
def fix_obj(self, obj):
if 'in_reply_to_user_id' in obj:
obj['in_reply_to_user_id'] = str(obj['in_reply_to_user_id'])
if 'retweeted_status' in obj:
if 'retweet_count' in obj['retweeted_status']:

print(obj['retweeted_status']['retweet_count'])

obj['retweeted_status']['retweet_count'] =

str(obj['retweeted_status']['retweet_count'])
obj['retweeted_status']=obj['retweeted_status']
elif 'retweet_count' in obj:
obj['retweet_count'] = str(obj['retweet_count'])
return obj
def iterate(self, loop_count, delay=0):
time.sleep(delay)

while 0 < loop_count:
loop_count = loop_count - 1
self.write_file_to_es()
return self.index

error message:

Traceback (most recent call last):
File "", line 1, in
File "doc_writer.py", line 58, in write_files_to_es
self.write_block_to_es(file)
File "doc_writer.py", line 87, in write_block_to_es
response = self.connect.index(line, self.index['name'],
self.type['name'], self.key_id, bulk=True)
File "/usr/lib/python2.7/site-packages/pyes-0.14.1-py2.7.egg/pyes/
es.py", line 519, in index
self.flush_bulk()
File "/usr/lib/python2.7/site-packages/pyes-0.14.1-py2.7.egg/pyes/
es.py", line 543, in flush_bulk
self.force_bulk()
File "/usr/lib/python2.7/site-packages/pyes-0.14.1-py2.7.egg/pyes/
es.py", line 551, in force_bulk
self._send_request("POST", "/_bulk", self.bulk_data.getvalue())
File "/usr/lib/python2.7/site-packages/pyes-0.14.1-py2.7.egg/pyes/
es.py", line 197, in _send_request
response = self.connection.execute(request)
File "/usr/lib/python2.7/site-packages/pyes-0.14.1-py2.7.egg/pyes/
connection.py", line 174, in _client_call
raise NoServerAvailable
pyes.exceptions.NoServerAvailable


(Wolf-2) #4

Thank you for the pointers Clint.

I set swapoff -a on all servers, cut down jvm memory allocation to 3/4
of 4G physical capacity, set index.refresh_interval: -1 and
index.merge_factor=30

I also added an another server to total 15 and extended the shards to
30

I am still getting some rolling performance congestion every 40-60 or
so block transfers of 10K documents, occasionally crashes still occur
after 5.5M documents are loaded with a NoServerAvailable exception.

Hopefully I can get a more sustained load input and replication as my
next objectives.

after setting the # of shard replicas to 1, refresh interval to 1s and
merge factor to 10 the cluster status went to yellow!

what do I do now?

Regards,

Wolf

On Mar 26, 3:47 pm, Clinton Gormley clin...@iannounce.co.uk wrote:

Hi Wolf

I wonder if you're running into swap problems, or your garbage
collections are slow because some of the JVM is being swapped out?

Try turning swap off, it may help.

Also, see this blog post about what's in master:

http://www.elasticsearch.org/blog/2011/03/23/update-settings.html

hth

clint


(Wolf-2) #5

Thank you for the pointers Clint.

I set swapoff -a on all servers, cut down jvm memory allocation to 3/4
of 4G physical capacity, set index.refresh_interval: -1 and
index.merge_factor=30

I also added an another server to total 15 and extended the shards to
30

I am still getting some rolling performance congestion every 40-60 or
so block transfers of 10K documents, occasionally crashes still occur
after 5.5M documents are loaded with a NoServerAvailable exception.

Hopefully I can get a more sustained load input and replication as my
next objectives.

after setting the # of shard replicas to 1, refresh interval to 1s and
merge factor to 10 the cluster status went to yellow!

what do I do now?

Regards,

Wolf

On Mar 26, 3:47 pm, Clinton Gormley clin...@iannounce.co.uk wrote:

Hi Wolf

I wonder if you're running into swap problems, or your garbage
collections are slow because some of the JVM is being swapped out?

Try turning swap off, it may help.

Also, see this blog post about what's in master:

http://www.elasticsearch.org/blog/2011/03/23/update-settings.html

hth

clint


(Wolf-2) #6

I have progressively been improving my performance with the python API
although better block performance still has inconsistencies and server
not found exception blowups.

Should I use the Java API for bulk loading? Possibly this would give
me better control.

Peter Karich has a intro article on the API to get me started:

Regards,

Wolf

On Mar 27, 4:57 pm, Wolf wolfk...@gmail.com wrote:

Thank you for the pointers Clint.

I set swapoff -a on all servers, cut down jvm memory allocation to 3/4
of 4G physical capacity, set index.refresh_interval: -1 and
index.merge_factor=30

I also added an another server to total 15 and extended the shards to
30

I am still getting some rolling performance congestion every 40-60 or
so block transfers of 10K documents, occasionally crashes still occur
after 5.5M documents are loaded with a NoServerAvailable exception.

Hopefully I can get a more sustained load input and replication as my
next objectives.

after setting the # of shard replicas to 1, refresh interval to 1s and
merge factor to 10 the cluster status went to yellow!

what do I do now?

Regards,

Wolf

On Mar 26, 3:47 pm, Clinton Gormley clin...@iannounce.co.uk wrote:

Hi Wolf

I wonder if you're running into swap problems, or your garbage
collections are slow because some of the JVM is being swapped out?

Try turning swap off, it may help.

Also, see this blog post about what's in master:

http://www.elasticsearch.org/blog/2011/03/23/update-settings.html

hth

clint


(Shay Banon) #7

Hey,

I am not familiar with the pyes API to great details, when does the server not found exception is being raised? Maybe you can gist the code that uses pyes,

-shay.banon
On Monday, March 28, 2011 at 2:33 AM, Wolf wrote:

I have progressively been improving my performance with the python API
although better block performance still has inconsistencies and server
not found exception blowups.

Should I use the Java API for bulk loading? Possibly this would give
me better control.

Peter Karich has a intro article on the API to get me started:

http://java.dzone.com/articles/get-started-elasticsearch

Regards,

Wolf

On Mar 27, 4:57 pm, Wolf wolfk...@gmail.com wrote:

Thank you for the pointers Clint.

I set swapoff -a on all servers, cut down jvm memory allocation to 3/4
of 4G physical capacity, set index.refresh_interval: -1 and
index.merge_factor=30

I also added an another server to total 15 and extended the shards to
30

I am still getting some rolling performance congestion every 40-60 or
so block transfers of 10K documents, occasionally crashes still occur
after 5.5M documents are loaded with a NoServerAvailable exception.

Hopefully I can get a more sustained load input and replication as my
next objectives.

after setting the # of shard replicas to 1, refresh interval to 1s and
merge factor to 10 the cluster status went to yellow!

what do I do now?

Regards,

Wolf

On Mar 26, 3:47 pm, Clinton Gormley clin...@iannounce.co.uk wrote:

Hi Wolf

I wonder if you're running into swap problems, or your garbage
collections are slow because some of the JVM is being swapped out?

Try turning swap off, it may help.

Also, see this blog post about what's in master:

http://www.elasticsearch.org/blog/2011/03/23/update-settings.html

hth

clint


(Wolf-2) #8

Hi Shay,

Here is the gist link:

git://gist.github.com/890954.git

I run the program as follows:

hosts = [<list of last digit of ipv4 address, 192.168.0. is assumed by
program>]

from doc_writer import DocWriter

dw = DocWriter(hosts) # if no argument given the local host is the
default

dw.write_files_to_es()


I was able to run this program for 6 hours last night and load 50M
tweets with only the text field indexed.

I have not attempted to set up a single replica yet since last time I
got a yellow state.

The JVM swap on seems to be the primary cause of the Server Not Found
Exception, Clint gave me the concept of this.

I am loading 10K tweets per bulk insert typically in 2.4 to 4.6
seconds but I still do notice a rolling congestion in the network
wherein a delay of up to 1 minute between bulk inserts occurs.

I may move to Java API to control the loading and application of the
system more handily.

I need to load 20M tweets per day and support up to real time queries.
I can extend my VM instances if necessary.

Wolf


here is the index setting I was using

{ "index":
{ "name": "twitter",
"properties":
{ "index":
{ "numberOfShards": 30, "numberOfReplicas": 0,
"refresh_interval" : "10s",
"merge.policy.merge_factor" : 30,
"analysis":
{ "analyzer":
{ "collation":
{ "tokenizer": "keyword",
"filter": ["myCollator"]
},
"my_analyzer":
{ "type": "igo"
}
},
"filter":
{ "myCollator":
{ "type": "icu_collation",
"language": "ja"
}
}
}
}
}
}
}

On Mar 28, 3:18 am, Shay Banon shay.ba...@elasticsearch.com wrote:

Hey,

I am not familiar with the pyes API to great details, when does the server not found exception is being raised? Maybe you can gist the code that uses pyes,

-shay.banon

On Monday, March 28, 2011 at 2:33 AM, Wolf wrote:

I have progressively been improving my performance with the python API
although better block performance still has inconsistencies and server
not found exception blowups.

Should I use the Java API for bulk loading? Possibly this would give
me better control.

Peter Karich has a intro article on the API to get me started:

http://java.dzone.com/articles/get-started-elasticsearch

Regards,

Wolf

On Mar 27, 4:57 pm, Wolf wolfk...@gmail.com wrote:

Thank you for the pointers Clint.

I set swapoff -a on all servers, cut down jvm memory allocation to 3/4
of 4G physical capacity, set index.refresh_interval: -1 and
index.merge_factor=30

I also added an another server to total 15 and extended the shards to
30

I am still getting some rolling performance congestion every 40-60 or
so block transfers of 10K documents, occasionally crashes still occur
after 5.5M documents are loaded with a NoServerAvailable exception.

Hopefully I can get a more sustained load input and replication as my
next objectives.

after setting the # of shard replicas to 1, refresh interval to 1s and
merge factor to 10 the cluster status went to yellow!

what do I do now?

Regards,

Wolf

On Mar 26, 3:47 pm, Clinton Gormley clin...@iannounce.co.uk wrote:

Hi Wolf

I wonder if you're running into swap problems, or your garbage
collections are slow because some of the JVM is being swapped out?

Try turning swap off, it may help.

Also, see this blog post about what's in master:

http://www.elasticsearch.org/blog/2011/03/23/update-settings.html

hth

clint


(Alberto Paro-2) #9

Il giorno 28/mar/2011, alle ore 20.36, Wolf ha scritto:

To reduce problems with memory allocation, usually I set the ES_MIN_MEM=ES_MAX_MEM.

To improve the import speed, try to use an eventlet pool in which you define a connection for every server to parallelize the inserting.

(I'm writing the documentation for pyes, so be patient. But the tests dir cover a lot of cases)

Also I'll implement some helpers to set the index in "bulk mode" as http://www.elasticsearch.org/blog/2011/03/23/update-settings.html after checking that ES is a least a 0.16 version

Let me know, if you discovery some problems with pyes (possibly via github or IRC), so I can improve it.

Hi,
Alberto Paro


(Wolf-2) #10

Hi Alberto,

Thank you for getting back.

Presently I am generating and pyes ES instance with a list of node IP
addresses. Does this generate the eventlet pool? If not could you be
more specific. My code is on gist:

Here is the gist link:

    git://gist.github.com/890954.git

I run the program as follows from the python command prompt:

hosts = [<list of last digit of ipv4 address, 192.168.0. is assumed by
program>]

from doc_writer import DocWriter

dw = DocWriter(hosts) # if no argument is given the local host is the
default

dw.write_files_to_es()


I have set the ES_MIN_MEM = ES_MAX_MEM

Regards,

Wolf

On Mar 30, 1:20 pm, Alberto Paro alberto.p...@gmail.com wrote:

Il giorno 28/mar/2011, alle ore 20.36, Wolf ha scritto:

To reduce problems with memory allocation, usually I set the ES_MIN_MEM=ES_MAX_MEM.

To improve the import speed, try to use an eventlet pool in which you define a connection for every server to parallelize the inserting.

(I'm writing the documentation for pyes, so be patient. But the tests dir cover a lot of cases)

Also I'll implement some helpers to set the index in "bulk mode" ashttp://www.elasticsearch.org/blog/2011/03/23/update-settings.htmlafter checking that ES is a least a 0.16 version

Let me know, if you discovery some problems with pyes (possibly via github or IRC), so I can improve it.

Hi,
Alberto Paro


(Alberto Paro-2) #11

Il giorno 31/mar/2011, alle ore 10.20, Wolf ha scritto:

Hi Alberto,

Thank you for getting back.

Presently I am generating and pyes ES instance with a list of node IP
addresses. Does this generate the eventlet pool? If not could you be
more specific. My code is on gist:

I'll give you some references http://eventlet.net/doc/modules/greenpool.html :

The flow is this one put your code:

def bulk_data_insert(server, filedata):
#create es connection
# load data
# do bulk insert
#results

def mygenerator(servers, filestoprocess):
count = 0
for filename in filestoprocess:
#load data from filename
yield servers[count%len(servers)], data
count += 1

def main():
#init green threadpool size==num server
#give them to process to pool
pool = GreenPool(len(servers))
for result in pool.imap(bulk_data_insert, mygenerator(servers, filestoprocess)):
print result

NOTE:
Using eventlet you have not blocking calls. You can consider eventlet as "node.js python version".
If you sent too much data and you don't have a lot of memory/CPU on ES server, you'll put it on high load.

Hi,
Alberto


(system) #12