Dec 23rd, 2021: [es] Aumentando documentos con la política geo_match

:speaking_head: This article is also available in English

Como tal vez sepas, el 19 de Septiembre de 2021 comenzó la erupción del Cumbre Vieja en la isla de La Palma en las Islas Canarias, España. El gobierno de la isla comenzó inmediatamente a publicar levantamientos fotogramétricos con drones de los perímetros de los flujos de lava en su portal de Datos Abiertos (ejemplo). Mi idea fue entonces cargarlos en el stack de Elastic para tratar de entender la evolución de este desastre natural.

Además de los flujos, existe también un juego de datos en el portal con los edificios, por lo que vi que era una buena oportunidad para intersectar los flujos de lava con los edificios para ver cómo la erupción estaba afectando a viviendas, fábricas, etc.

Presentando la política geo_match para el enrich processor

En una pipeline de ingesta se puede utilizar enrich processor para aumentar datos desde un índice de referencia a partir de un campo con contenido común o mediante una relación espacial entre los documentos del índice de refencia y los documentos siendo ingestados. Con la política geo_match, a partir de un índice con un campo geométrico, podemos transferir datos de estos documentos de referencia a un documento ingestado a partir de una relación espacial. En general lo que queremos es usar la relación de intersección espacial. Es decir, dado por ejemplo un índice de refencia con los límites de países, podemos transferir el nombre del país a cualquier documento con una geometría que esté dentro de dichos límites. Puedes aprender más sobre esta política en este ejemplo de la documentación de Elasticsearch y en un tutorial más largo sobre cómo usar esta pólitica para hacer geocodificación inversa en esta entrada de blog.

En este caso que nos ocupa el índice de referencia serán los flujos de lava y los documentos aumentados serán los edificios.

Preparando los perímetros de los flujos de lava

El primer asunto a atender es la carga del índice de referencia que se usará en la política geo_match. Conforme la erupción ha ido avanzando, el área de los perímetros de lava lo ha ido haciendo también. Es decir, dada una posición, varios perímetros pueden intersectar con ella, pero nosotros sólo queremos el primer perímetro publicado que afectó a la posición.

Por otro lado la carga de datos necesitaba de automatización para atender algunos problemas de formato y datos. En la automatización añadí un paso para calcular la diferencia entre un perímetro y su predecesor, obteniendo algo así como un anillo en un nuevo campo de tipo geo_shape llamado diff_geometry. De esta forma, para este campo, una localización podría tener como máximo un único perímetro.

Este es el código que encadena los métodos difference y simplify de la bibiloteca shapely para producir la geometría intersectada.

Cargando los edificios

El script de procesado de datos carga los edificios si no se encuentran en el cluster. Tras descargar los datos del portal y asegurar que sus geometrías son válidas, el bulk helper del cliente Python de Elasticsearch se encarga de la subida de datos.

Definiendo la politica de aumento de datos y la pipeline de ingesta

Los perímetros se almacenan en un índice llamado lapalma, y queremos transferir a los edificios los campos id y timestamp, por lo que la política sería:

PUT /_enrich/policy/lapalma_lookup
{
  "geo_match": {
    "indices": "lapalma",
    "match_field": "diff_geometry",
    "enrich_fields": ["id", "timestamp"],
  }
}

El siguiente paso es ejecutar la política:

POST /_enrich/policy/lapalma_lookup/_execute

:warning: ¡Importante! :warning:: Debemos ejecutar este último paso cada vez que haya una actualización en el índice de los perímetros de lava.

Con la política preparada, podemos añadirla a una pipeline de ingesta. Esta pipeline espera un campo de tipo geométrico que intersectará con el definido en la política. La política creará un nuevo campo llamado footprints en el que se almacenarán tanto el identificador como la fecha del índice de referencia. La pipeline en un segundo paso borrará la geometría del perímetro, ya que ésta se añade por defecto y no la necesitamos en el documento enriquecido.

PUT _ingest/pipeline/buildings_footprints
{
  "description": "Enrich buildings with Cumbre Vieja footprints.",
  "processors": [
    {
      "enrich": {
        "field": "geometry",
        "policy_name": "lapalma_lookup",
        "target_field": "footprints",
        "shape_relation": "INTERSECTS",
        "ignore_missing": True,
        "ignore_failure": True,
      }
    },
    {
      "remove": {
        "field": "footprints.diff_geometry",
        "ignore_missing": True,
        "ignore_failure": True,
        "description": "Remove the shape field",
      }
    },
  ],
}

Ejecutando la pipeline de ingesta

Ahora que ya tenemos nuestra pipeline lista, vamos a actualizar el índice de los edicifios utilizando la instrucción _update_by_query. Para optimizar el proceso no vamos a actualizar todos los documentos del índice, sino que vamos a restringir la consulta buscando los documentos que satisfacen dos condiciones:

  1. Se encuentran dentro de las coordenadas que afectan a la zona de la erupción
  2. No tienen un identificador de perímetro asignado
POST buildings/_update_by_query?pipeline=buildings_footprints
{
  "query": {
    "bool": {
      "must_not": [ { "exists": { "field": "footprints.id" } } ],
      "filter": {
        "geo_bounding_box": {
          "geometry": {
            "top_left": { "lat": 28.647, "lon": -17.95 },
            "bottom_right": { "lat": 28.58, "lon": -17.83 },
          }
        }
      },
    }
  }
}

Todo junto

A lo largo de esta entrada he utilizado la sintaxis de las Kibana DevTools para facilitar su lectura, pero todas estas peticiones se ejecutan desde un script en python dentro de una Github Action tras cada cambio en el fichero que contiene los identificadores de los perímetros. Por desgracia no hay una forma sencilla de automatizar la publicación de nuevos perímetros, por lo que estos identificadores se tienen que introducir manualmente para a continuación automáticamente disparar todo el proceso de actualización (commit de ejemplo).

Con los datos cargados en el cluster, ya podemos explorarlos con Elastic Maps y con un cuadro de mandos de Kibana.

05-animation-buildings
https://ela.st/cumbre-vieja-eruption-map


https://ela.st/cumbre-vieja-eruption

¡Esto es todo! :wave:

1 Like