I'm using elasticsearch-dsl-py to create an index with a nested field (points):
class Shape(DocType):
route = Text()
startData = Date()
points = Nested(
properties={
'segmentStart': Integer(),
'longitude': Float(),
'latitude': Float(),
}
)
class Meta:
index = "shapes"
doc_type = "*"
Shape.init()
The index is successfully created with the right mapping. But when I load the data it doesn't consider the nested field and the data isn't loaded the way it should.
This is the code I use to load the data:
with open("/path/to/datafile", "r") as f:
reader = csv.DictReader(f, delimiter='|')
bulk(
connections.get_connection(),
(Shape(**row).to_dict(True) for row in reader)
)
The data file header is route|startDate|segmentStart|longitude|latitude.
I think something is wrong with the way I'm using to_dict (I'm not redifining it), but haven't been able to find a functional example for a bulk with a nested field.
I'm not sure about what you mean by "flat". And yes, each property of the points field always has one value on each line. I use nested because I thought that was the right thing to do (according to what I found).
The problem is really this one: when I upload the data using Logstash, it looks like this:
You can see a list of points for each route. I want to achieve the same thing using elasticsearch-py (DSL or non DSL).
When I upload the data using Logstash, I use exactly the same datafile and the same mapping (in JSON format).
Ok, so you need to create a list of points for each route and then index them in:
import csv
from itertools import groupby
def read_routes():
with open("/path/to/datafile", "r") as f:
reader = csv.DictReader(f, delimiter='|')
# separate data into groups by the value of 'route' key
for route, points in groupby(reader, lambda p: p['route']):
points = list(points)
# take first point's startData as the route's
startData = points[0]
points = [
{
'segment_start': p['segmentStart'],
'longtitude': p['longtitude',
'lattitude': p['lattitude']
} for p in points
]
yield Shape(route=route, startData=startData, points=points)
bulk(
connections.get_connection(),
(s.to_dict(True) for s in read_routes())
)
exactly! And then the bulk would be: bulk(es, read_routes()).
You could even just yield the _source content and add the index and doc_type parems to the bulk: bulk(es, read_routes(), index=INDEX_NAME, doc_type=DOC_TYPE). That makes for the shortest code
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.