Hi Team,
Please check below code which is use python3 and es.bulk() to ingest data in large csv file into elasticsearch. The result is that it doesn't ingest any data. The for loop looks like is not iterative. what's wrong in the for loop?
# -*- coding: utf-8 -*- """ Created on Thu Aug 29 11:13:55 2019 @author: I073341 """ # load package from elasticsearch import Elasticsearch import pandas as pd import time root_path = "C:/elkstack/elasticsearch-7.0.1-windows-x86_64/data/" raw_data_path = root_path + "testcase/" csv_filename = "testcase0801.csv" t0 = time.time() # size of the bulk chunksize = 5000 # open csv file f = open(raw_data_path + csv_filename) # read csv # parse csv with pandas csvfile = pd.read_csv(f,iterator = True, chunksize = chunksize) # init a new instance of the Elasticsearch client class es = Elasticsearch('http://localhost:9200/') # Init a mapping mapping = { "index_patterns": ["test*"], "settings": { "number_of_shards": 1, "number_of_replicas": 1, "refresh_interval": "5s" }, "mappings": { "_doc": { "dynamic_templates": [ { "All": { "match": "*", "match_mapping_type": "*", "mapping": { "type": "text", "fields": { "raw": { "type": "keyword", "ignore_above": 256}}}}}] }}} # put a template es.indices.put_template(name = 't1', body = mapping, include_type_name = True) # init index try: es.indices.delete("testcase") except: pass es.indices.create(index = "testcase") # start bulk indexing print ("now indexing %s..."%(csv_filename)) for i, df in enumerate(csvfile): print (i) records = df.where(pd.notnull(df),None).T.to_dict() list_records = [records[it] for it in records] try: es.bulk(index="testcase", doc_type="_doc", body=list_records) except: print ("error!, skip some test case sorry") pass print ("done in %.3fs"%(time.time()-t0))
Here is the output log
runfile('C:/Users/I073341/.spyder-py3/esbulkimport.py', wdir='C:/Users/I073341/.spyder-py3') now indexing testcase0801.csv... 0 error!, skip some test case sorry done in 0.475s
And here is the value for variable in the run time