Dec 9th, 2020: [EN] Don't let your Christmas tree Rust in a corner

Christmas trees are part of what makes this time of the year so unique and brings sparkles in the eyes of children and adults alike. But what do you do with the tree once the party’s over? The best is to make sure it’s recycled and used as compost or firewood. Now how do you find where to dispose of your tree so that it will be correctly taken care of?

I live in Toulouse in the south of France, and the local authorities have not only set up a lot of collection places, but also published their location as open data. It’s even listed on European open data. There’s a treasure trove of information there!

Let’s use this information to build a nice map in Kibana, and use the Rust client for Elasticsearch for that? Why Rust? Because it’s a great language that is growing in popularity, and this is an opportunity to experiment!

The source code for all the code below is available on GitHub so that you can try it at home or on Elastic Cloud.

Show me your data!

We’ll start by looking at the JSON data format: it’s an array of records like this one (some fields we won’t use have been omitted, “commune” means city and “adresse”, well… you can guess it):

{
  "datasetid": "collecte-des-sapins-de-noel",
  "recordid": "ef89fdb5cbb3b397d2988b7d23c1fee5199b989c",
  "fields": {
    "commune": "TOULOUSE",
    "adresse": "88 all Jean Jaurès / angle rue Riquet",
    "geo_point_2d": [
      43.6089310498,
	  1.45385907091
    ]
  }
}

The first two fields are identifiers, and properties of field are the data we want to use for our map. An important thing to note is that the coordinates are in (lat, long) order.

Fetching, transforming and ingesting the data

Rust being a strongly typed language, let’s first set up data structures for this input data, and also a more flat structure that we will use to store the data in Elasticsearch. We will use the great serde library for JSON serialization and deserialization.

#[derive(Debug, Deserialize)]
struct SourcePlace {
   pub datasetid: String,
   pub recordid: String,
   pub fields: SourceFields,
}

#[derive(Debug, Deserialize)]
struct SourceFields {
   pub commune: String,
   pub adresse: String,
   pub geo_point_2d: (f64, f64), // lat, lon
}

#[derive(Debug, Serialize)]
struct IndexedPlace {
   pub dataset_id: String,
   pub record_id: String,
   pub city: String,
   pub street: String,
   pub location: (f64, f64), // lon, lat
}

Now that our data structures are defined, we can write the code!

const DATA_URL: &str = "https://data.toulouse-metropole.fr/explore/dataset/collecte-des-sapins-de-noel/download/?format=json";
const INDEX_NAME: &str = "xmas-tree-recycling";

#[tokio::main]
async fn main() -> anyhow::Result<()> {

Since the Elasticsearch Rust client is asynchronous and uses reqwest for http that itself uses tokio for async networking, the easiest way to setup the environment is to make our main method async and let tokio handle it.

Let’s now set up the Elasticsearch client. To keep the configuration simple, we get the cluster’s URL from an environment variable:

// Use the URL (including login/password) from the ELASTICSEARCH_URL env variable
let es_url = std::env::var("ELASTICSEARCH_URL")?;
let es_client = Elasticsearch::new(Transport::single_node(&es_url)?);

We use Rust’s ? operator whenever a function returns a Result to exit early in case of error and bubble it up outside of main where it will be printed on standard error.

Before ingesting data we need to create the index with a geo point mapping for the coordinates so that we can use it in Kibana maps:

es_client.indices()
  .create(IndicesCreateParts::Index(&INDEX_NAME))
  .body(json!({
    "mappings": {
      "properties": {
        "location": { "type": "geo_point" }
      }
    }
  }))
  .send().await?
  .error_for_status_code()?;

es_client.indices() gives us a “namespace client” for all things related to index management. We then provide the index name, and the index mapping. We use serde’s json! macro to build the request body as the Rust client doesn’t yet have strongly typed definitions for all APIs.

We then send the request and await its response. Calling error_for_status_code() turns a successful response with an error status code (for example 401 for authentication failures) into a Rust error.

All good, our index is setup, we can fetch the data that we want to store in it!

let response = reqwest::get(DATA_URL).await?
  .error_for_status()?;

let places: Vec<SourcePlace> = 
  serde_json::from_slice(&response.bytes().await?)?;

We simply GET the data with the reqwest library, and parse the JSON response bytes as a vector of SourcePlace structures with the serde library.

Let’s now do a bit of data munging to have flat fields and, more importantly, provide geo coordinates in (lon, lat) order as Elasticsearch expects them while they were provided as (lat, lon).

let indexed_places = places.into_iter()
   .map(|place| IndexedPlace {
       dataset_id: place.datasetid,
       record_id: place.recordid,
       city: place.fields.commune,
       street: place.fields.adresse,
       location: (
           place.fields.geo_point_2d.1, // swap order
           place.fields.geo_point_2d.0
       )
   }
);

We can now index all this data in bulk. We’ll set the target index on the bulk request so we don’t have to repeat it for each bulk insert operation.

println!("Storing data.");
let bulk_response = es_client
  .bulk(BulkParts::Index(INDEX_NAME))
  .body(
    // create a bulk indexing operation for each place
    indexed_places.map(|place|
      BulkOperation::from(BulkOperation::index(place))
    ).collect()
  )
  .send().await?
  .error_for_status_code()?;

Notice that we directly use an IndexedPlace object to build the index operation. Since that type implements Serialize, the Rust client will automatically convert it to JSON.

As previously we check the response status code. But in the case of bulk requests this isn’t enough as errors are reported individually within a successful response. So we’ll parse this response and verify the global error property that indicates if there was any issue, and return an error with the full JSON response if an error happened:

let bulk_response = response.json::<JsonValue>().await?;
if bulk_response["errors"] == JsonValue::Bool(true) {
  return Err(anyhow!("Failed to store data: {}", bulk_response));
}

If we reach that point without errors, the data has been indexed, and our ingestion work is done!

  println!("Done!");
  Ok(())
}

Displaying the data on a map

Let’s now create a nice map out of our freshly ingested data! This all happens in Kibana, and we’ll use the latest (and greatest) version 7.10.0.

First we need to declare an index pattern for our xmas-tree-recycling index in "Management / Stack management / Kibana / Index patterns". You will notice that the location field has been correctly indexed as geo_point.

Next go to "Kibana / Maps" to create a new map. We will now add a "Documents" layer:

Select the xmas-tree-recycling index pattern, and Kibana will automatically find the location field:

The next steps are to give a name to this layer and add the city and street fields as map point tooltips.

We want more than boring circles on our map, so as finishing touch choose a tree icon and green colors in the "Layer Style" section!

Save this layer, zoom in to France and to Toulouse, and here we are, a nice map with all Christmas tree recycling locations!

Conclusion

This simple example showed that it’s rather easy to grab remote data sources and ingest them in Elasticsearch and build a nice map in no time.

The Rust language may seem complicated at first, particularly if you’re used to dynamically typed languages, but its strong typing system and super strict compiler allow building extremely robust applications. If it compiles, it runs, unless there’s a bug in your application logic. And Rust produces small and very efficient executables: the binary size for this program is a tiny 4 Mbytes! Compare this to your usual Java classpath or node_modules directory...

The Rust Elasticsearch client provides methods for all Elasticsearch API endpoints with strong typing of request path and query parameters. Strong typing of request and response bodies is still a work in progress and is the current focus of the Clients team at Elastic.

Post-scriptum

If you had a look at the various formats offered for this data set, you may have noticed that it’s available as GeoJSON. And Kibana Maps allows you to directly upload GeoJSON files...

Ahem…

Is this to say that all this code was useless? Well, a learning exercise is always useful and if you’ve read that far you’ve certainly learned a thing or two :slight_smile:

Happy holiday season!

5 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.