surbas
(Shon Urbas)
March 12, 2016, 10:40pm
1
Hello
I am using the elasticsearch-py python client to bulk index a bunch of documents.
I currently am experimenting with elasticsearch on the cloud.
My workflow is:
delete index 'blah' if exist
create index 'blah'
then for 10000 documents save them to the index.
If I use the index
function (http://elasticsearch-py.readthedocs.org/en/master/api.html#elasticsearch.Elasticsearch.index ) for a single document at a time api i report 200 or 201 and the document is able to be retrieved via a curl.
When using the bulk function (http://elasticsearch-py.readthedocs.org/en/master/api.html#elasticsearch.Elasticsearch.bulk ) I also get 200 or 201 for every document. However when curl those documents i am unable to retrieve them. Also looking at this via '_stats' endpoint it show no document in the index.
Must be something very dumb I am missing and hoping for any help.
Thanks
Shon
dadoonet
(David Pilato)
March 12, 2016, 11:10pm
2
Can you send a script which reproduce the issue so we can tell what is wrong?
surbas
(Shon Urbas)
March 12, 2016, 11:54pm
3
Thanks here it is:
from __future__ import division, print_function
import elasticsearch
import elasticsearch.helpers
from elasticsearch_dsl import DocType, String, Long, MetaField, Index, Nested
from elasticsearch_dsl.connections import connections
from django.db import connection
from models import Item, ItemPrice, Manufacturer
class Product(DocType):
availability = String()
description = String(index='no')
dimensions = String(index='no')
manufacturer_id = String(index='no')
name = String(index='not_analyzed')
position = Long(index='no')
sku = String(index='not_analyzed')
uuid = String(index='not_analyzed')
variants = Nested(
properties={
'name': String(),
'sku': String(),
'description': String()
}
)
category = Nested(
properties={
'categoryposition': Long(index='no'),
'depth': Long(index='no'),
'logourl': String(index='no'),
'manufacturer_id': String(index='no'),
'name': String(index='no'),
'netobjectid': Long(index='no'),
'parent_id': String(index='no'),
'path': String(index='not_analyzed'),
'uuid': String(index='not_analyzed'),
}
)
class Meta:
dynamic = MetaField('off')
class Indexer(object):
def __init__(self, elastic_hosts, username=None, password=None):
self.elastic_hosts = elastic_hosts
auth = (username, password) if username else None
# magic global connection to elasticsearch db
connections.create_connection(hosts=elastic_hosts, http_auth=auth)
self.client = elasticsearch.Elasticsearch()
def create_index(self, account_id):
"""
Create an elastic index (db) for named <account_id>.
Deletes an existing index if it exists before creating a new one.
:param account_id:
:return:
"""
acct_index = Index(account_id.lower())
#TODO: - Turn this into zero downtime reindexes using aliases
# See: https://www.elastic.co/blog/changing-mapping-with-zero-downtime
# Or use an id to when creating docs... however i think then we would need to worry about item deletes
acct_index.delete(ignore=404)
acct_index = Index(account_id.lower())
acct_index.doc_type(Product)
try:
acct_index.create()
except elasticsearch.exceptions.RequestError as e:
print(e)
self.client.cluster.health(wait_for_status='yellow')
@staticmethod
def to_elastic_dict(prod):
return dict(meta={'id': prod.netObjectID}, name=prod.name, sku=prod.sku, description=prod.longDesc)
def index_products(self, account_id):
def generative_bulk():
prod = None
items_q = Item.objects.filter(account_id=account_id)
for item in items_q.all():
e_dic = self.to_elastic_dict(item)
prod = Product(**e_dic)
#bulk_dict = prod.to_dict(include_meta=True)
yield prod # bulk_dict
if prod is None:
print("NO ITEMS!!!!")
for item in generative_bulk():
item.save()
# below should work instead but it doesn't
# import pprint
# pprint.pprint(list(elasticsearch.helpers.streaming_bulk(client=self.client, actions=generative_bulk(), refresh=True))
if name == '__name__':
elastic_indexer = indexer.Indexer(['https://elastic.cloud.on.aws'], 'user', 'pass')
elastic_indexer.create_index('TEST_ACCT')
elastic_indexer.index_products('TEST_ACCT')
surbas
(Shon Urbas)
March 13, 2016, 12:00am
4
For the record I am testing this against a elastic cloud instance. I am using Shield with a user with read/write privileges
dadoonet
(David Pilato)
March 13, 2016, 2:26am
5
Can you reproduce with a pure curl/SENSE script?
surbas
(Shon Urbas)
April 10, 2016, 7:55pm
6
Sorry for the late reply. I Got it to work basically with what's posted. Slight typo in original code.