Bulk API returns 201 or 200 but no documents in index (confused)

Hello

I am using the elasticsearch-py python client to bulk index a bunch of documents.

I currently am experimenting with elasticsearch on the cloud.
My workflow is:
delete index 'blah' if exist
create index 'blah'

then for 10000 documents save them to the index.

If I use the index function (http://elasticsearch-py.readthedocs.org/en/master/api.html#elasticsearch.Elasticsearch.index) for a single document at a time api i report 200 or 201 and the document is able to be retrieved via a curl.

When using the bulk function (http://elasticsearch-py.readthedocs.org/en/master/api.html#elasticsearch.Elasticsearch.bulk) I also get 200 or 201 for every document. However when curl those documents i am unable to retrieve them. Also looking at this via '_stats' endpoint it show no document in the index.

Must be something very dumb I am missing and hoping for any help.

Thanks
Shon

Can you send a script which reproduce the issue so we can tell what is wrong?

Thanks here it is:

from __future__ import division, print_function

import elasticsearch
import elasticsearch.helpers

from elasticsearch_dsl import DocType, String, Long, MetaField, Index, Nested
from elasticsearch_dsl.connections import connections

from django.db import connection
from models import Item, ItemPrice, Manufacturer


class Product(DocType):

    availability = String()
    description = String(index='no')
    dimensions = String(index='no')
    manufacturer_id = String(index='no')
    name = String(index='not_analyzed')
    position = Long(index='no')
    sku = String(index='not_analyzed')
    uuid = String(index='not_analyzed')

    variants = Nested(
        properties={
            'name': String(),
            'sku': String(),
            'description': String()
        }
    )

    category = Nested(
        properties={
            'categoryposition': Long(index='no'),
            'depth': Long(index='no'),
            'logourl': String(index='no'),
            'manufacturer_id': String(index='no'),
            'name': String(index='no'),
            'netobjectid': Long(index='no'),
            'parent_id': String(index='no'),
            'path': String(index='not_analyzed'),
            'uuid': String(index='not_analyzed'),
        }
    )

    class Meta:
        dynamic = MetaField('off')

class Indexer(object):

    def __init__(self, elastic_hosts, username=None, password=None):

        self.elastic_hosts = elastic_hosts

        auth = (username, password) if username else None

        # magic global connection to elasticsearch db
        connections.create_connection(hosts=elastic_hosts, http_auth=auth)
        self.client = elasticsearch.Elasticsearch()

    def create_index(self, account_id):
        """
            Create an elastic index (db) for named <account_id>.
            Deletes an existing index if it exists before creating a new one.

        :param account_id:
        :return:
        """

        acct_index = Index(account_id.lower())

        #TODO: - Turn this into zero downtime reindexes using aliases
        #  See: https://www.elastic.co/blog/changing-mapping-with-zero-downtime
        #  Or use an id to when creating docs... however i think then we would need to worry about item deletes
        acct_index.delete(ignore=404)
        acct_index = Index(account_id.lower())

        acct_index.doc_type(Product)

        try:
            acct_index.create()
        except elasticsearch.exceptions.RequestError as e:
            print(e)

        self.client.cluster.health(wait_for_status='yellow')

    @staticmethod
    def to_elastic_dict(prod):
        return dict(meta={'id': prod.netObjectID}, name=prod.name, sku=prod.sku, description=prod.longDesc)

    def index_products(self, account_id):

        def generative_bulk():

            prod = None

            items_q = Item.objects.filter(account_id=account_id)

            for item in items_q.all():

                e_dic = self.to_elastic_dict(item)
                prod = Product(**e_dic)
                #bulk_dict = prod.to_dict(include_meta=True)
                yield prod # bulk_dict

            if prod is None:
                print("NO ITEMS!!!!")

        for item in generative_bulk():
            item.save()

        # below should work instead but it doesn't
        # import pprint
        # pprint.pprint(list(elasticsearch.helpers.streaming_bulk(client=self.client, actions=generative_bulk(), refresh=True))
if name == '__name__':
    elastic_indexer = indexer.Indexer(['https://elastic.cloud.on.aws'], 'user', 'pass')
    elastic_indexer.create_index('TEST_ACCT')
    elastic_indexer.index_products('TEST_ACCT')

For the record I am testing this against a elastic cloud instance. I am using Shield with a user with read/write privileges

Can you reproduce with a pure curl/SENSE script?

Sorry for the late reply. I Got it to work basically with what's posted. Slight typo in original code.