Hi,
First of all I cannot find any documentation other than python ES help
pages or reading source code to setup bulk loading with pyes. However
I am now running a demonstration with a limited capacity before errors
occur. My report is below.
I am using the pyes module bulk loading application with elasticsearch
0.15.2 . I am attempting to load a 6G documents of twitter like
content onto a 14 instance Xen virtual cluster. Each instance has 2
processors, 4GB of data and 50GB of storage. I am using the ES
facility constructor with all with the array of Xen instances, 10
retries and bulk_size runs of 400, 1000, 2000 or 10000.
My index has 28 shards with no replicas. I have a mapping setup with a
text field being indexed.
On a local server I have increased my file descriptor limit to 65535
and used 4 shards but I have not been able to eliminate the error.
I am not able to load more 2G of documents without a server timeout
error.
During operation I am able to load more than 2K of documents per
second but I need to sustain the input without errors.
What should I do?
Below is a listing of my code and the error message:
import json
import datetime
import time
import thread
import os
from thrift import Thrift
from thrift.transport import TTransport
from thrift.transport import TSocket
from thrift.protocol.TBinaryProtocol import TBinaryProtocolAccelerated
from pyes import *
from gzip import GzipFile
class DocWriter:
def init(self, hosts):
self.key_id = 0
self.port = '9500'
self.server_properties = { "base": "192.168.0.", "port": "9500" }
self.index = { "name": "twitter" }
self.type = { "name": "tweet" }
self.hosts = map(self.set_host, hosts)
self.property_files = { "base": "/root/F7setup", "index":
"index_settings", "mapping": "tweet_mapping" }
self.data_path = '/root/F7setup/Cluster/data'
self.connect = self.connection()
self.set_files()
try:
self.set_index()
except Exception:
print "Index: " + self.index['name'] + " already exists"
else:
print "Index: " + self.index['name'] + " is set"
self.set_mapping()
def set_host(self, host):
return self.server_properties['base'] + host + ":" +
self.server_properties['port']
def connection(self):
return ES(self.hosts, bulk_size=10000, max_retries=10)
def delete_index(self):
self.connect.delete_index(self.index['name'])
def set_index(self):
with open(self.property_files['base'] + "/" +
self.property_files['index'], 'r') as f:
index=json.loads(f.read())
self.index=index=index['index']
self.connect.create_index(index['name'], index['properties'])
def set_mapping(self):
with open(self.property_files['base']
+"/"+self.property_files['mapping'], 'r') as f:
self.type=json.loads(f.read())
self.type=self.type['type']
self.connect.put_mapping(self.type['name'], self.type,
self.index['name'])
def set_files(self):
self.data_files = self.get_files()
def get_files(self):
return map(lambda x: self.data_path + "/" + x,
os.listdir(self.data_path))
def write_files_to_es(self, file_count_offset=0):
start = datetime.datetime.now()
self.set_files()
del self.data_files[0:file_count_offset]
for file in self.data_files:
self.write_block_to_es(file)
print "document count: " + str(self.key_id)
print file + " written to es"
finish = datetime.datetime.now()
print "time span: ", finish - start
print " duration: ", finish - start
def write_file_to_es(self, file):
with GzipFile(file, 'r') as f:
line = f.readline()
while 0 < len(line):
self.key_id = self.key_id + 1
line = json.loads(line, object_hook=self.fix_obj)
response = self.connect.index(line, self.index['name'],
self.type['name'], self.key_id)
try: response['ok']
except NameError:
print "file name: ", file, " count: ", self.key_id
print response
f.close
return self.key_id
else:
line = f.readline()
f.closed
return self.key_id
def write_block_to_es(self, file):
with GzipFile(file, 'r') as f:
line = f.readline()
while 0 < len(line):
self.key_id = self.key_id + 1
line = json.loads(line, object_hook=self.fix_obj)
response = self.connect.index(line, self.index['name'],
self.type['name'], self.key_id, bulk=True)
line = f.readline()
f.closed
return self.key_id
def write_file_line_to_es(self, file, line_no):
line_cnt = 1
with GzipFile(file, 'r') as f:
line = f.readline()
while 0 < len(line):
self.key_id = self.key_id + 1
if line_cnt == line_no:
print line
line = json.loads(line, object_hook=self.fix_obj)
response = self.connect.index(line, self.index['name'],
self.type['name'], self.key_id)
break
line_cnt = line_cnt + 1
line = f.readline()
f.closed
return self.key_id
def fix_obj(self, obj):
if 'in_reply_to_user_id' in obj:
obj['in_reply_to_user_id'] = str(obj['in_reply_to_user_id'])
if 'retweeted_status' in obj:
if 'retweet_count' in obj['retweeted_status']:
print(obj['retweeted_status']['retweet_count'])
obj['retweeted_status']['retweet_count'] =
str(obj['retweeted_status']['retweet_count'])
obj['retweeted_status']=obj['retweeted_status']
elif 'retweet_count' in obj:
obj['retweet_count'] = str(obj['retweet_count'])
return obj
def iterate(self, loop_count, delay=0):
time.sleep(delay)
while 0 < loop_count:
loop_count = loop_count - 1
self.write_file_to_es()
return self.index
error message:
Traceback (most recent call last):
File "", line 1, in
File "doc_writer.py", line 58, in write_files_to_es
self.write_block_to_es(file)
File "doc_writer.py", line 87, in write_block_to_es
response = self.connect.index(line, self.index['name'],
self.type['name'], self.key_id, bulk=True)
File "/usr/lib/python2.7/site-packages/pyes-0.14.1-py2.7.egg/pyes/
es.py", line 519, in index
self.flush_bulk()
File "/usr/lib/python2.7/site-packages/pyes-0.14.1-py2.7.egg/pyes/
es.py", line 543, in flush_bulk
self.force_bulk()
File "/usr/lib/python2.7/site-packages/pyes-0.14.1-py2.7.egg/pyes/
es.py", line 551, in force_bulk
self._send_request("POST", "/_bulk", self.bulk_data.getvalue())
File "/usr/lib/python2.7/site-packages/pyes-0.14.1-py2.7.egg/pyes/
es.py", line 197, in _send_request
response = self.connection.execute(request)
File "/usr/lib/python2.7/site-packages/pyes-0.14.1-py2.7.egg/pyes/
connection.py", line 174, in _client_call
raise NoServerAvailable
pyes.exceptions.NoServerAvailable