[ES7.0.1+python3][HELP]es.bulk() + python3 doesn't ingest csv data into elasticsearch

Hi Team,

Please check below code which is use python3 and es.bulk() to ingest data in large csv file into elasticsearch. The result is that it doesn't ingest any data. The for loop looks like is not iterative. what's wrong in the for loop?

# -*- coding: utf-8 -*-
"""
Created on Thu Aug 29 11:13:55 2019

@author: I073341
"""

# load package
from elasticsearch import Elasticsearch
import pandas as pd
import time

root_path = "C:/elkstack/elasticsearch-7.0.1-windows-x86_64/data/"
raw_data_path = root_path + "testcase/"
csv_filename = "testcase0801.csv"

t0 =  time.time()

# size of the bulk
chunksize = 5000

# open csv file

f = open(raw_data_path + csv_filename) # read csv

# parse csv with pandas
csvfile = pd.read_csv(f,iterator = True, chunksize = chunksize)

# init a new instance of the Elasticsearch client class
es = Elasticsearch('http://localhost:9200/')

# Init a mapping
mapping = {
  "index_patterns": ["test*"],
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1,
    "refresh_interval": "5s"
  },
  "mappings": {
    "_doc": {
      "dynamic_templates": [
        {
          "All": {
            "match": "*",
            "match_mapping_type": "*",
            "mapping": {
              "type": "text",
              "fields": {
                "raw": {
                  "type": "keyword",
                  "ignore_above": 256}}}}}]
    }}}

# put a template
es.indices.put_template(name = 't1', body = mapping, include_type_name = True)

# init index
try:
    es.indices.delete("testcase")
except:
    pass

es.indices.create(index = "testcase")

# start bulk indexing
print ("now indexing %s..."%(csv_filename))

for i, df in enumerate(csvfile):
    print (i)
    records = df.where(pd.notnull(df),None).T.to_dict()
    list_records = [records[it] for it in records]
    try: 
        es.bulk(index="testcase", doc_type="_doc", body=list_records)
    except:
        print ("error!, skip some test case sorry")
        pass

    
print ("done in %.3fs"%(time.time()-t0))

Here is the output log

runfile('C:/Users/I073341/.spyder-py3/esbulkimport.py', wdir='C:/Users/I073341/.spyder-py3')
now indexing testcase0801.csv...
0
error!, skip some test case sorry
done in 0.475s

And here is the value for variable in the run time

Hi @cheriemilk

if you remove the except and share the exception it may help to understand what is going wrong.

    #try: 
    es.bulk(index="testcase", doc_type="_doc", body=list_records)
    # except:
    #    print ("error!, skip some test case sorry")
    #   pass

Or you can print the exception when it raise.

Hi Gabriel,

I remove the except and run it.

for i, df in enumerate(csvfile):
    print (i)
    records = df.where(pd.notnull(df),None).T.to_dict()
    list_records = [records[i] for i in records]
    reader = csv.DictReader(f)
    es.bulk(index="testcase", doc_type="_doc", body=list_records)

The error in Anaconda is below, and there's no error in elasticsearch side.

 File "C:\Users\I073341\AppData\Local\Continuum\anaconda3\lib\site-packages\elasticsearch\connection\http_urllib3.py", line 251, in perform_request
    self._raise_error(response.status, raw_data)

  File "C:\Users\I073341\AppData\Local\Continuum\anaconda3\lib\site-packages\elasticsearch\connection\base.py", line 178, in _raise_error
    status_code, error_message, additional_info

RequestError: RequestError(400, 'illegal_argument_exception', 'Malformed action/metadata line [1], expected START_OBJECT or END_OBJECT but found [VALUE_STRING]')

Thanks,
Cherie

Hi @cheriemilk

The error is here:

Actually you can't throw the python list as it in the bulk method you need to format your data.

Here a link to an example on how to format your data for a bulk:
https://elasticsearch-py.readthedocs.io/en/master/helpers.html?highlight=bulk#example

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.