BELOW is my code and mapping to create a new index and load data from pandas to elastic.
I have around 400K records with the potential to increase to 2 Millions.
from elasticsearch.helpers import bulk , streaming_bulk, parallel_bulk
config = {
'host': 'xxx.xx.liv','port':xxxx
}
es = Elasticsearch([config,], )
MAPPING: --
request_body = {
"settings" : {
"number_of_shards": 5,
"number_of_replicas": 1
},
'mappings': {
'Product': {
'properties': {
'ipc': {'type': 'keyword'},
'O_D_P': {'type': 'float'},
'O_D_P_P': {'type': 'float'},
'A_P': {'type': 'float'},
'O': {'type': 'integer'},
'D_P': {'type': 'float'},
'D_P_P': {'type': 'float'},
'L_I': {'type': 'float'},
'H_I': {'type': 'float'}
}}}
}
print("creating 'example_index' index...")
res = es.indices.delete(index = 'ipc')
es.indices.create(index = 'ipc', body = request_body)
DATA:
bulk_data =
for index, row in DataFrame.iterrows():
data_dict = {}
for i in range(len(row)):
data_dict[DataFrame.columns[i]] = row[i]
op_dict = {
"index": {
"_index": 'ipc',
"_type": 'Product',
"_id": data_dict['ipc']
}
}
bulk_data.append(op_dict)
bulk_data.append(data_dict)
res = es.bulk(index = 'ipc',body = bulk_data,refresh=True,request_timeout=360000)
es.search(body={"query": {"match_all": {}}}, index = 'ipc')
es.indices.get_mapping(index = 'ipc')
I am getting Broken Pipe error. Is there a way in which we can load data by chunk?
I tried using Streaming Bulk :
res = streaming_bulk(client=es, actions=bulk_data,index ='ipc', chunk_size=1, max_retries=5,
initial_backoff=2, max_backoff=600, request_timeout=3600,refresh=True, yield_ok=True)
for ok, response in res:
print(ok, response)
es.search(body={"query": {"match_all": {}}}, index = 'ipc')
es.indices.get_mapping(index = 'ipc')
ERROR:
RequestError: RequestError(400, 'action_request_validation_exception', 'Validation Failed: 1: type is missing;'
How can I insert in chunks?