Hello. I am probably doing many things wrong on my baby steps in trying to load from sql to ES using python. I have some syntax error if you can help pls?
import pyodbc
from elasticsearch import Elasticsearch,helpers
from elasticsearch.helpers import bulk, streaming_bulk
ES_HOST = {"host" : "localhost", "port" : 9200}
INDEX_NAME = 'aaa'
TYPE_NAME = 'bbb'
server = 'xxx'
database = 'xxx'
username = 'xxx'
password = 'xxx'
driver= '{ODBC Driver 13 for SQL Server}'
cnxn = pyodbc.connect('DRIVER='+driver+';SERVER='+server+';PORT=1443;DATABASE='+database+';UID='+username+';PWD='+ password)
cursor = cnxn.cursor()
cursor.execute("select top 10 a,b,c,d from table")
rows = cursor.fetchall()
bulk_data = []
for row in rows:
action = {
"index": {
"_index": INDEX_NAME,
"_type": TYPE_NAME,
"_source": {
"a":row.a,
"b":row.b,
"c":row.c,
"d":row.d
}
}
}
bulk_data.append(action)
es = Elasticsearch(hosts = [ES_HOST])
if es.indices.exists(INDEX_NAME):
res = es.indices.delete(index = INDEX_NAME)
request_body = {
"settings" : {
"number_of_shards": 1,
"number_of_replicas": 0
},
'mappings': {
'aaa_map': {
'properties': {
'a': {'type': 'text'},
'b': {'type': 'text'},
'c': {'type': 'integer'},
'd': {'type': 'integer'},
}}}
}
es.indices.create(index = INDEX_NAME, body = request_body)
es.bulk(index = INDEX_NAME, body = bulk_data)
es.search(index = INDEX_NAME, size=2, body={"query": {"match_all": {}}})
dadoonet
(David Pilato)
March 16, 2017, 11:06am
2
I don't really read Python but I believe that you are not respecting the bulk format here.
Read https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-bulk.html
thanks for quick reply !! all got sorted... here is the script that worked, I simplified this from the version I used.. the Bulk API was all right, problem was further down in syntax... thanks
from elasticsearch import Elasticsearch, helpers
from time import sleep
import pyodbc
from time import sleep
ES_HOST = {"host" : "localhost", "port" : 9200}
INDEX_NAME = 'aaa'
TYPE_NAME = 'aa'
server = 'xxx'
database = 'xx'
username = 'xxx'
password = 'xxx'
driver= '{ODBC Driver 13 for SQL Server}'
cnxn = pyodbc.connect('DRIVER='+driver+';SERVER='+server+';PORT=1443;DATABASE='+database+';UID='+username+';PWD='+ password)
cursor = cnxn.cursor()
cursor.execute("select a,b,c from xxx")
rows = cursor.fetchall()
es_client = Elasticsearch(hosts = [{ "host" : "localhost", "port" : 9200 }])
bulk_data = []
for row in rows:
action = {
"_index": INDEX_NAME,
"_type": TYPE_NAME,
"_source": {
"a": row.a,
"b": str(row.b),
"c": str(row.c)
}
}
bulk_data.append(action)
if es_client.indices.exists(INDEX_NAME):
es_client.indices.delete(index = INDEX_NAME)
request_body = {
"settings" : {
"number_of_shards": 1,
"number_of_replicas": 0
},
'mappings': {
'aaa': {
'properties': {
'a': {'type': 'date'},
'b': {'type': 'text'},
'c': {'type': 'text'}
}}}
}
es_client.indices.create(index = INDEX_NAME, body = request_body)
helpers.bulk(es_client, bulk_data, refresh=True)
sleep(1)
es_client.indices.refresh(index=INDEX_NAME)
dadoonet
(David Pilato)
March 17, 2017, 11:03am
4
Great. Note that sleep(1)
is not needed as you are calling refresh.
system
(system)
Closed
April 14, 2017, 11:04am
5
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.