Dec 20th, 2018: [EN][Python/Elasticsearch] Python DSL for Elasticsearch

Python DSL for Elasticsearch

In the previous post we learned about elasticsearch-dsl and created an index in Elasticsearch to store all of our git history, now it's time to load in the data and see what we can do with it.

Load data

To load data we first create the actual function to generate all out Commit objects. We are using a generator to avoid loading all of the data into memory in our load script; this is super important for larger datasets where creating a list would severely impact performance or completely crash. We are using the GitPython package to parse the information from git history.


from datetime import datetime
from git import Repo

def traverse_history(repo):
    head = repo.refs.master.commit
    for commit in head.traverse():
        # use a generator so that we don't have to hold everything in memory
        yield Commit(
            # specify ID as _id, no need to map it, it's always a string
            _id=commit.hexsha,

            committed_date=datetime.fromtimestamp(commit.committed_date),
            committer=User(
                name=commit.committer.name,
                email=commit.committer.email,
            ),
            authored_date=datetime.fromtimestamp(commit.authored_date),
            author=User(
                name=commit.author.name,
                email=commit.author.email,
            ),
            message=commit.message,
            parent_shas=[p.hexsha for p in commit.parents],
            files=list(commit.stats.files)
        )

The generator is very straightforward, just creating instances of Commit class and yielding them out. We specify an _id value representing the unique identifier of the document in Elasticsearch, this is not a field that will be present in the document and also doesn't have to be mapped in the class definition - all documents in Elasticsearch always have an _id which is a string. Indexing a document with the same _id twice will overwrite the existing document.

The simplest way to save a Document into Elasticsearch is to just call .save() method on the instance:

repo = Repo(path)
for c in traverse_history(repo):
    c.save()

While this code is simple it is also very inefficient since it is indexing one document at a time. When we are loading multiple documents it is always recommended to use the _bulk API, in our case we will be using the bulk helper from the underlying elasticsearch-py library. This helper accepts any iterable (so our generator is fine) of dict objects, meaning we have to properly serialize our documents into dict by calling to_dict, we also need to pass in include_meta=True to include the metadata fields like the document's _id.


from elasticsearch_dsl import connections
from elasticsearch import helpers

repo = Repo(path)

# obtain the low-level connection
es = connections.get_connection()

helpers.bulk(
  es,
  # generator expression to properly serialize our Commits
  (c.to_dict(include_meta=True) for c in traverse_history(repo))
)

In our case this is sufficient but in advanced use cases we might also want to run full_clean() method on the Commit objects before calling to_dict() to make sure everything is validated and also any auto-generated fields are populated properly (as those can be done in clean as seen in the completion.py example.

Run Queries

Now we have all that we need to start running some queries against our commit history. To create a Search object querying against our index and documents use the .search() class method, the Search object contains all that we need to run any search request:

s = Commit.search()

# return the count of documents matching our search
s.count()

# only limit ourselves to commits that claim to be fixing something
s = s.query('match', message='fix')

# ask for 5 hits instead of the default 10
s = s[:5]

# return hits sorted by committed_date descending
s = s.sort('-committed_date')

# iterating over the Search will execute it and iterate over matching documents
for hit in s:
    print(
        # you can access any metadata field through .meta
        hit.meta.id[:6],
        # committed_date is a datetime object
        hit.committed_date.strftime('%Y-%m-%d'),
        # we can use any methods we defined on Commit
        hit.subject()
    )

This will produce output listing the latest 5 commits that mention fix:

52f329 2018-11-21 Fix for python 2
0b39c1 2018-11-21 Fix Completion field
6b28f6 2018-11-15 Correct test failure related to Python 3 fix as .serialize() now returns str.
ae5d06 2018-11-15 Fix serialization of Binary fields in Python 3.
d59b01 2018-11-15 Fix location field typo when creating user

All of the above operations, up until the iteration, are lazy - they don't actually execute the search and just alter the search definition locally without sending anything to Elasticsearch. The operations also return a copy of the object, meaning we have to remember to assign back to itself if we intend to keep the copy.

Next let's have a look at some aggregations, those work a little bit differently as they actually modify the Search object in place instead of returning that copy. This is done so that we can support nested aggregations:

from elasticsearch_dsl import Range

# again start with an empty search
s = Commit.search()

# only get data for 2018
s = s.filter('range', committed_date=Range(gte=datetime(2018, 1, 1)))

# this time we are not insterested in hits, so let's set the size to 0
s = s[:0]

# any aggregations are to be defined on .aggs
# start with a (bucketing) date_histogram aggregation
s.aggs.bucket(
    # our identifier
    'months',

    # type of the aggregation
    'date_histogram',

    # any additional kwargs will be converted to parameters of the aggregation
    # see Elasticsearch documentation for the list of possible parameters
    field='committed_date',
    interval='month'
).metric(
    # we can nest other aggs inside of a bucket aggregation just by chaining the methods
    'authors', 'cardinality', field='author.name.keyword'
)

# explicitly execute the search request
response = s.execute()

# agg results are in .aggregations.OUR_NAME
for month in response.aggregations.months.buckets:
    print(month.key.strftime('%Y-%m'), "|", month.authors.value)
1 Like

This will give us number of distinct contributors we had each month of 2018:

2018-01 | 2
2018-02 | 4
2018-03 | 4
2018-04 | 0
2018-05 | 2
2018-06 | 1
2018-07 | 4
2018-08 | 4
2018-09 | 1
2018-10 | 2
2018-11 | 7

You can see the repeated pattern of accessing response items via the attribute notation (month.authors.value as opposed to month['authors']['values']) and also the fields being properly deserialized when mapped (month.key being an instance of datetime).

Finally let's put queries and aggregation together for a last final example that I actually use regularly on this dataset, I present it here without comments and will explain in the next part of this blog series what it does:

s = Commit.search()[:0]
s = s.filter('match', message='fix')
s = s.exclude('prefix', files='test_elasticsearch_dsl')
s.aggs.bucket('authors', 'terms', field='author.name.keyword', size=1)