Index from pandas to Elastic Search Using BULK and Parallel BULK

BELOW is my code and mapping to create a new index and load data from pandas to elastic.

I have around 400K records with the potential to increase to 2 Millions.

from elasticsearch.helpers import bulk , streaming_bulk, parallel_bulk
config = {
'host': 'xxx.xx.liv','port':xxxx
es = Elasticsearch([config,], )

request_body = {
"settings" : {
"number_of_shards": 5,
"number_of_replicas": 1

'mappings': {
    'Product': {
        'properties': {
            'ipc': {'type': 'keyword'},
            'O_D_P': {'type': 'float'},
            'O_D_P_P': {'type': 'float'},
            'A_P': {'type': 'float'},
            'O': {'type': 'integer'},
            'D_P': {'type': 'float'},
            'D_P_P': {'type': 'float'},
            'L_I': {'type': 'float'},
            'H_I': {'type': 'float'}

print("creating 'example_index' index...")
res = es.indices.delete(index = 'ipc')
es.indices.create(index = 'ipc', body = request_body)


bulk_data =

for index, row in DataFrame.iterrows():
data_dict = {}
for i in range(len(row)):
data_dict[DataFrame.columns[i]] = row[i]
op_dict = {
"index": {
"_index": 'ipc',
"_type": 'Product',
"_id": data_dict['ipc']

res = es.bulk(index = 'ipc',body = bulk_data,refresh=True,request_timeout=360000){"query": {"match_all": {}}}, index = 'ipc')
es.indices.get_mapping(index = 'ipc')

I am getting Broken Pipe error. Is there a way in which we can load data by chunk?

I tried using Streaming Bulk :

res = streaming_bulk(client=es, actions=bulk_data,index ='ipc', chunk_size=1, max_retries=5,
initial_backoff=2, max_backoff=600, request_timeout=3600,refresh=True, yield_ok=True)
for ok, response in res:
print(ok, response){"query": {"match_all": {}}}, index = 'ipc')
es.indices.get_mapping(index = 'ipc')

RequestError: RequestError(400, 'action_request_validation_exception', 'Validation Failed: 1: type is missing;'

How can I insert in chunks?

I don't know the python api, but could it be a missing value in your call? Try to add a type to your parameters, e.g.:
res = es.bulk(index = 'ipc', doc_type = "_doc", body = bulk_data,refresh=True,request_timeout=360000)

Same error:

ConnectionError: ConnectionError(('Connection aborted.', BrokenPipeError(32, 'Broken pipe'))) caused by: ProtocolError(('Connection aborted.', BrokenPipeError(32, 'Broken pipe')))

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.