Dec 1st, 2018: [FR][Logstash] Enrichir ses adresses postales

Enrichir ses adresses postales

Imaginons que nous ayons indexé dans Elasticsearch des documents qui représentent des adresses postales françaises sous la forme :

{
  "address": {
    "zipcode": "17000",
    "number": "23",
    "city": "La Rochelle",
    "street_name": "Rue Verdière"
  },
  "location": {
    "lon": -1.155167,
    "lat": 46.157353
  }
}

Avec un mapping associé tel que :

{
  "properties" : {
    "address": {
      "properties" : {
        "city": {
          "type": "text"
        },
        "number": {
          "type": "keyword"
        },
        "street_name": {
          "type": "text"
        },
        "zipcode": {
          "type": "keyword"
        }
      }
    },
    "location": {
      "type": "geo_point"
    }
  }
}

Nous aimerions maintenant pouvoir enrichir nos propres données métier pour trouver la "meilleure" localisation géographique en fonction des adresses de nos utilisateurs.

Et si on cherchait ?

Nous avons sous la main le meilleur moteur de recherche du marché... Autant l'utiliser pour trouver la bonne adresse.

Par exemple, pour chercher une adresse pour laquelle notre utilisateur a saisi "23", "r verdiere", "rochelle", nous pouvons lancer la requête suivante :

GET .bano/_search?search_type=dfs_query_then_fetch
{
  "size": 1,
  "query": {
    "bool": {
      "should": [
        {
          "match": {
            "address.number": "23"
          }
        },
        {
          "match": {
            "address.street_name": "r verdiere"
          }
        },
        {
          "match": {
            "address.city": "rochelle"
          }
        }
      ]
    }
  }
}

Cela nous ramène l'adresse la plus proche correspondant, à savoir :

{
  "address": {
    "zipcode": "17000",
    "number": "23",
    "city": "La Rochelle",
    "street_name": "Rue Verdière"
  },
  "location": {
    "lon": -1.155167,
    "lat": 46.157353
  }
}

Logstash pour la transformation à la volée

Mettons cela en place dans Logstash en exposant un service HTTP qui pourra être par la suite appelé depuis notre application par exemple. Notez qu'on pourrait appeler directement Elasticsearch depuis notre application.

input {
  http { }
}

filter {
  elasticsearch {
    query_template => "search-by-name.json"
    index => ".bano"
    fields => {
      "location" => "[location]"
      "address" => "[address]"
    }
    remove_field => ["headers", "host", "@version", "@timestamp"]
  }
}

output {
  stdout { codec => rubydebug }
}

Le fichier search-by-name.json est équivalent à la requête elasticsearch que nous avions vu précédemment :

{
  "size": 1,
  "query":{
    "bool": {
      "should": [
        {
          "match": {
            "address.number": "%{[address][number]}"
          }
        },
        {
          "match": {
            "address.street_name": "%{[address][street_name]}"
          }
        },
        {
          "match": {
            "address.city": "%{[address][city]}"
          }
        }
      ]
    }
  }
}

Si nous appelons l'API HTTP :

curl -XPOST "localhost:8080" -H "Content-Type: application/json" -d '{
  "test_case": "Address with text",
  "name": "Joe Smith",
  "address": {
    "number": "23",
    "street_name": "r verdiere",
    "city": "rochelle",
    "country": "France"
  }
}'

Nous obtenons :

{
    "test_case" => "Address with text",
     "location" => {
        "lon" => -1.155167,
        "lat" => 46.157353
    },
         "name" => "Joe Smith",
      "address" => {
               "city" => "La Rochelle",
        "street_name" => "Rue Verdière",
            "zipcode" => "17000",
             "number" => "23"
    }
}

Il vous suffit maintenant de changer la partie input pour lire votre source de données, par exemple depuis une base de données :

jdbc {
  jdbc_driver_library => "mysql-connector-java-6.0.6.jar"
  jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
  jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/person?useSSL=false"
  jdbc_user => "root"
  jdbc_password => ""
  schedule => "* * * * *"
  parameters => { "country" => "France" }
  statement => "SELECT p.id, p.name, p.dateOfBirth, p.gender, p.children, a.city, a.country, a.countrycode, a.lat, a.lon, a.zipcode FROM Person p, Address a WHERE a.id = p.address_id AND a.country = :country AND p.id > :sql_last_value"
   use_column_value => true
   tracking_column => "id"
}

Et voilà !

2 Likes