Python DSL for Elasticsearch
In the previous post we learned about elasticsearch-dsl
and created an index in Elasticsearch to store all of our git history, now it's time to load in the data and see what we can do with it.
Load data
To load data we first create the actual function to generate all out Commit
objects. We are using a generator to avoid loading all of the data into memory in our load script; this is super important for larger datasets where creating a list
would severely impact performance or completely crash. We are using the GitPython
package to parse the information from git history.
from datetime import datetime
from git import Repo
def traverse_history(repo):
head = repo.refs.master.commit
for commit in head.traverse():
# use a generator so that we don't have to hold everything in memory
yield Commit(
# specify ID as _id, no need to map it, it's always a string
_id=commit.hexsha,
committed_date=datetime.fromtimestamp(commit.committed_date),
committer=User(
name=commit.committer.name,
email=commit.committer.email,
),
authored_date=datetime.fromtimestamp(commit.authored_date),
author=User(
name=commit.author.name,
email=commit.author.email,
),
message=commit.message,
parent_shas=[p.hexsha for p in commit.parents],
files=list(commit.stats.files)
)
The generator is very straightforward, just creating instances of Commit
class and yield
ing them out. We specify an _id
value representing the unique identifier of the document in Elasticsearch, this is not a field that will be present in the document and also doesn't have to be mapped in the class definition - all documents in Elasticsearch always have an _id
which is a string. Indexing a document with the same _id
twice will overwrite the existing document.
The simplest way to save a Document
into Elasticsearch is to just call .save()
method on the instance:
repo = Repo(path)
for c in traverse_history(repo):
c.save()
While this code is simple it is also very inefficient since it is indexing one document at a time. When we are loading multiple documents it is always recommended to use the _bulk
API, in our case we will be using the bulk helper from the underlying elasticsearch-py
library. This helper accepts any iterable (so our generator is fine) of dict
objects, meaning we have to properly serialize our documents into dict
by calling to_dict
, we also need to pass in include_meta=True
to include the metadata fields like the document's _id
.
from elasticsearch_dsl import connections
from elasticsearch import helpers
repo = Repo(path)
# obtain the low-level connection
es = connections.get_connection()
helpers.bulk(
es,
# generator expression to properly serialize our Commits
(c.to_dict(include_meta=True) for c in traverse_history(repo))
)
In our case this is sufficient but in advanced use cases we might also want to run full_clean()
method on the Commit
objects before calling to_dict()
to make sure everything is validated and also any auto-generated fields are populated properly (as those can be done in clean
as seen in the completion.py
example.
Run Queries
Now we have all that we need to start running some queries against our commit history. To create a Search
object querying against our index and documents use the .search()
class method, the Search
object contains all that we need to run any search request:
s = Commit.search()
# return the count of documents matching our search
s.count()
# only limit ourselves to commits that claim to be fixing something
s = s.query('match', message='fix')
# ask for 5 hits instead of the default 10
s = s[:5]
# return hits sorted by committed_date descending
s = s.sort('-committed_date')
# iterating over the Search will execute it and iterate over matching documents
for hit in s:
print(
# you can access any metadata field through .meta
hit.meta.id[:6],
# committed_date is a datetime object
hit.committed_date.strftime('%Y-%m-%d'),
# we can use any methods we defined on Commit
hit.subject()
)
This will produce output listing the latest 5 commits that mention fix
:
52f329 2018-11-21 Fix for python 2
0b39c1 2018-11-21 Fix Completion field
6b28f6 2018-11-15 Correct test failure related to Python 3 fix as .serialize() now returns str.
ae5d06 2018-11-15 Fix serialization of Binary fields in Python 3.
d59b01 2018-11-15 Fix location field typo when creating user
All of the above operations, up until the iteration, are lazy - they don't actually execute the search and just alter the search definition locally without sending anything to Elasticsearch. The operations also return a copy of the object, meaning we have to remember to assign back to itself if we intend to keep the copy.
Next let's have a look at some aggregations, those work a little bit differently as they actually modify the Search
object in place instead of returning that copy. This is done so that we can support nested aggregations:
from elasticsearch_dsl import Range
# again start with an empty search
s = Commit.search()
# only get data for 2018
s = s.filter('range', committed_date=Range(gte=datetime(2018, 1, 1)))
# this time we are not insterested in hits, so let's set the size to 0
s = s[:0]
# any aggregations are to be defined on .aggs
# start with a (bucketing) date_histogram aggregation
s.aggs.bucket(
# our identifier
'months',
# type of the aggregation
'date_histogram',
# any additional kwargs will be converted to parameters of the aggregation
# see Elasticsearch documentation for the list of possible parameters
field='committed_date',
interval='month'
).metric(
# we can nest other aggs inside of a bucket aggregation just by chaining the methods
'authors', 'cardinality', field='author.name.keyword'
)
# explicitly execute the search request
response = s.execute()
# agg results are in .aggregations.OUR_NAME
for month in response.aggregations.months.buckets:
print(month.key.strftime('%Y-%m'), "|", month.authors.value)