I am using the Python bulk API to insert meta-data into ES. I also parallelized the insertion processing using Python multiprocessing package in which fresh_interval is set to 30s.
After I changed to ES 2.0, it will report "time out" messge for the first 1,000 elements though I set it 30 seconds and even 60 seconds. Previous ES 1.7.2 doesn't have this problem.
Here is the code snippet, and actuall "size" is set to 1,000. If I understand the API correctly, chunk_size is the maximal size and the actual number of elements can be smaller than that. Is this correct in ES 2.0?
def tcp_worker(ES, sub_lines, index_name, size):
timeout_t = 30
tcp_dump_list = []
j = 0
for line in sub_lines:
line_list = line.split()
s_ip = line_list[2]
s_port = int(line_list[3])
d_ip = line_list[4]
d_port = int(line_list[5])
bytes = int(line_list[6])
packets= int(line_list[7])
state = int(line_list[8])
new_stime = line_list[0] + "T" + line_list[1] + "+08:00"
a_tcp = {
"_index": index_name,
"_type" : "tcp_state_type",
"_source": {
"@ts": new_stime,
"sip": s_ip,
"sport": s_port,
"dip": d_ip,
"dport": d_port,
"packets": packets,
"bytes": bytes,
"state": state}
}
tcp_dump_list.append(a_tcp)
j += 1
if ( (j % size) == 0) :
try:
helpers.bulk(ES, tcp_dump_list, chunk_size=size, request_timeout=timeout_t)
except Exception, e:
print "%dth chunk, Timeout=%d" %(j, timeout_t)
timeout_t = timeout_t + timeout_t
del tcp_dump_list[0:size]
#remaining iterms
if (len(tcp_dump_list)>0) :
try:
helpers.bulk(ES, tcp_dump_list, chunk_size=size, request_timeout=timeout_t)
except Exception, e:
print "Error %s Timeout=%d" %(str(e), timeout_t)