Newbie 1st script having issues - load from SQL to ES

Hello. I am probably doing many things wrong on my baby steps in trying to load from sql to ES using python. I have some syntax error if you can help pls?

import pyodbc
from elasticsearch import Elasticsearch,helpers
from elasticsearch.helpers import bulk, streaming_bulk

ES_HOST = {"host" : "localhost", "port" : 9200}
INDEX_NAME = 'aaa'
TYPE_NAME = 'bbb'
server = 'xxx'
database = 'xxx'
username = 'xxx'
password = 'xxx'
driver= '{ODBC Driver 13 for SQL Server}'
cnxn = pyodbc.connect('DRIVER='+driver+';SERVER='+server+';PORT=1443;DATABASE='+database+';UID='+username+';PWD='+ password)
cursor = cnxn.cursor()
cursor.execute("select top 10 a,b,c,d from table")
rows = cursor.fetchall()

bulk_data = []

for row in rows:
    action = {
        "index": {
            "_index": INDEX_NAME,
            "_type": TYPE_NAME,
            "_source": {
                "a":row.a,
                "b":row.b,
                "c":row.c,
                "d":row.d
                    }
                }
                }
bulk_data.append(action)

es = Elasticsearch(hosts = [ES_HOST])

if es.indices.exists(INDEX_NAME):
    res = es.indices.delete(index = INDEX_NAME)

request_body = {
    "settings" : {
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
'mappings': {
	'aaa_map': {
	 'properties': {
'a': {'type': 'text'},
'b': {'type': 'text'},
'c': {'type': 'integer'},
'd': {'type': 'integer'},
	            }}}
}

es.indices.create(index = INDEX_NAME, body = request_body)
es.bulk(index = INDEX_NAME, body = bulk_data)
es.search(index = INDEX_NAME, size=2, body={"query": {"match_all": {}}})

I don't really read Python but I believe that you are not respecting the bulk format here.

Read https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-bulk.html

thanks for quick reply !! all got sorted... here is the script that worked, I simplified this from the version I used.. the Bulk API was all right, problem was further down in syntax... thanks

from elasticsearch import Elasticsearch, helpers
from time import sleep
import pyodbc
from time import sleep

ES_HOST = {"host" : "localhost", "port" : 9200}
INDEX_NAME = 'aaa'
TYPE_NAME = 'aa'
server = 'xxx'
database = 'xx'
username = 'xxx'
password = 'xxx'
driver= '{ODBC Driver 13 for SQL Server}'
cnxn = pyodbc.connect('DRIVER='+driver+';SERVER='+server+';PORT=1443;DATABASE='+database+';UID='+username+';PWD='+ password)
cursor = cnxn.cursor()
cursor.execute("select a,b,c from xxx")
rows = cursor.fetchall()

es_client = Elasticsearch(hosts = [{ "host" : "localhost", "port" : 9200 }])

bulk_data = []
for row in rows:
    action = {
    "_index": INDEX_NAME,
    "_type": TYPE_NAME,
    "_source": {
        "a": row.a,
        "b": str(row.b),
        "c": str(row.c)
            }
        }
    bulk_data.append(action)

if es_client.indices.exists(INDEX_NAME):
    es_client.indices.delete(index = INDEX_NAME)

request_body = {
    "settings" : {
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
'mappings': {
	'aaa': {
	 'properties': {
     'a': {'type': 'date'},
     'b': {'type': 'text'},
          'c': {'type': 'text'}
  
	            }}}
}

es_client.indices.create(index = INDEX_NAME, body = request_body)

helpers.bulk(es_client, bulk_data, refresh=True)

sleep(1)
es_client.indices.refresh(index=INDEX_NAME)

Great. Note that sleep(1) is not needed as you are calling refresh.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.