Nested bulk

I'm using elasticsearch-dsl-py to create an index with a nested field (points):

class Shape(DocType):
    route = Text()
    startData = Date()
    points = Nested(
	    'segmentStart': Integer(),
	    'longitude': Float(),
	    'latitude': Float(),

    class Meta:
	    index = "shapes"
	    doc_type = "*"

The index is successfully created with the right mapping. But when I load the data it doesn't consider the nested field and the data isn't loaded the way it should.

This is the code I use to load the data:

with open("/path/to/datafile", "r") as f:
    reader = csv.DictReader(f, delimiter='|')
        (Shape(**row).to_dict(True) for row in reader)

The data file header is route|startDate|segmentStart|longitude|latitude.

I think something is wrong with the way I'm using to_dict (I'm not redifining it), but haven't been able to find a functional example for a bulk with a nested field.

Any help is appreciated.

For this to work the data need to look like:

  "route": "test-route,
  "startData": datetime(2018, 1, 1, 10, 10, 10),
  "points": [
        "segmentStart": 42,
        "longtitude": 0.12,
        "lattitude": 42.1

and not like it does for you, which is just flat.

Hope this helps!

btw if you only ever expect one value for points field, and not a list, you don't need Nested and you should use Object instead.

This is how my data file looks (there are thousands of lines):

B01 00I|2017-04-01|1|-33.400169|-70.620153
B01 00I|2017-04-01|0|-33.400083|-70.620549
B01 00I|2017-04-01|0|-33.400909|-70.623564
B01 00I|2017-04-01|1|-33.400127|-70.625097

I'm not sure about what you mean by "flat". And yes, each property of the points field always has one value on each line. I use nested because I thought that was the right thing to do (according to what I found).

The problem is really this one: when I upload the data using Logstash, it looks like this:


You can see a list of points for each route. I want to achieve the same thing using elasticsearch-py (DSL or non DSL).

When I upload the data using Logstash, I use exactly the same datafile and the same mapping (in JSON format).

Just in case, the mapping looks like this:

  "mappings": {
    "*": {
      "properties": {
        "route" : {"type": "keyword"},
        "startDate" : {"type": "date"},
        "points": {
          "properties": {
            "segmentStart": {"type": "integer"},
            "longitude": {"type": "float"},
            "latitude": {"type": "float"}

I see, so you have one file per shape/route or do you have multiple in the same file?

Multiple routes in one file.

Ok, so you need to create a list of points for each route and then index them in:

import csv
from itertools import groupby

def read_routes():
    with open("/path/to/datafile", "r") as f:
        reader = csv.DictReader(f, delimiter='|')
        # separate data into groups by the value of 'route' key
        for route, points in groupby(reader, lambda p: p['route']):
            points = list(points)

            # take first point's startData as the route's
            startData = points[0]
            points = [
                    'segment_start': p['segmentStart'],
                    'longtitude': p['longtitude',
                    'lattitude': p['lattitude']
                } for p in points
            yield Shape(route=route, startData=startData, points=points)
    (s.to_dict(True) for s in read_routes())

Thank you very much! I made some simple changes to the code you gave me and now my program works. I couldn't have done it without your help :slight_smile:

But now I have a last question. If I wanted to do the same thing using the non DSL API, should the yield part be this way?

yield {
    "_index": INDEX_NAME,
    "_type": DOC_TYPE,
    "_source": {
        "route": route,
        "startDate": startDate,
        "points": points

And in that case, what should the parameters in the bulk command be? I think I can't use to_dict in this case. (Thank you again.)

1 Like

exactly! And then the bulk would be: bulk(es, read_routes()).

You could even just yield the _source content and add the index and doc_type parems to the bulk: bulk(es, read_routes(), index=INDEX_NAME, doc_type=DOC_TYPE). That makes for the shortest code :slight_smile:

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.