Dec 5th, 2023: [FR] Père Noël ? Il est temps de partir ! - TTL avec Elasticsearch

This post is also available in english.

Imaginez que le père Noël doit livrer des cadeaux à tous les enfants du monde. Il a beaucoup de travail à faire et il doit être efficace. Il a une liste de tous les enfants et il sait où ils vivent. Il va très probablement regrouper les cadeaux par région, puis il les livrera. Mais il ne restera pas au même endroit trop longtemps. Il va juste déposer les cadeaux et partir. Il n'attendra pas que les enfants ouvrent les cadeaux. Il partira juste.

Peut-être que nous pourrions lui suggérer de faire une liste des villes qu'il doit encore visiter. Et puis il pourrait retirer les villes de la liste une fois qu'il a livré les cadeaux. De cette façon, il saura où il doit encore aller. Et il ne perdra pas de temps à revenir au même endroit.

Pour faire cela, il pourrait utiliser un TTL (Time To Live) sur les villes qu'il doit visiter. Il va définir le TTL sur le temps dont il a besoin pour livrer les cadeaux. Et puis il va retirer les villes de la liste une fois que le TTL est expiré.

Voici à quoi pourrait ressembler son voyage :

{ "city": "Sidney", "deliver": "ASAP", "ttl": 0 }
{ "city": "Singapour", "deliver": "1 minute", "ttl": 1 }
{ "city": "Vilnius", "deliver": "3 minutes", "ttl": 3 }
{ "city": "Paris", "deliver": "5 minutes", "ttl": 5 }
{ "city": "Londres", "deliver": "6 minutes", "ttl": 6 }
{ "city": "Montréal", "deliver": "7 minutes", "ttl": 7 }
{ "city": "San Francisco", "deliver": "9 minutes", "ttl": 9 }
{ "city": "Pôle Nord", "deliver": "pour toujours" }

Le champ ttl contient la valeur du TTL en minutes :

  • aucune valeur signifie que le document sera conservé pour toujours.
  • zéro signifie que nous voulons supprimer le document dès que possible.
  • toute valeur positive correspond au nombre de minutes que nous devons attendre avant de supprimer le document.

Le pipeline d'ingestion ttl

Pour mettre en œuvre une telle fonctionnalité, vous avez juste besoin d'un pipeline d'ingestion :

DELETE /_ingest/pipeline/ttl
PUT /_ingest/pipeline/ttl
{
  "processors": [
    {
      "set": {
        "field": "ingest_date",
        "value": "{{{_ingest.timestamp}}}"
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": """
          ctx['ttl_date'] = ZonedDateTime.parse(ctx['ingest_date']).plusMinutes(ctx['ttl']);
        """,
        "if": "ctx?.ttl != null"
      }
    },
    {
      "remove": {
        "field": [ "ingest_date", "ttl" ],
        "ignore_missing": true
      }
    }
  ]
}

Expliquons cela un peu.

Le premier processeur définit un champ temporaire (ingest_date) dans le document et nous y injectons l'heure à laquelle le pipeline est exécuté (_ingest.timestamp), qui est, plus ou moins, la date d'indexation.

Ensuite, nous exécutons un script painless :

ctx['ttl_date'] = ZonedDateTime.parse(ctx['ingest_date']).plusMinutes(ctx['ttl']);

Ce script crée un objet java ZonedDateTime à partir de la valeur de type String disponible dans le champ ingest_date. Ensuite, nous appelons simplement la méthode plusMinutes et nous fournissons la valeur de ttl comme paramètre. Cela va simplement décaler la date d'ingestion de quelques minutes. Et nous stockons le résultat dans un nouveau champ ttl_date.

Notez que nous devons ajouter une condition pour exécuter ce processeur uniquement si un champ ttl existe :

"if": "ctx?.ttl != null"

Ensuite, nous supprimons simplement les champs non nécessaires ingest_date et éventuellement ttl si nous n'en avons plus besoin. Notez que pour des raisons de débogage, il peut être judicieux de conserver ttl dans le document. Dans le cas où aucun ttl n'est défini, nous devons également l'ignorer s'il est manquant. Cela se fait avec le paramètre "ignore_missing": true.

Pour tester ce pipeline, nous pouvons utiliser l'API simulate :

POST /_ingest/pipeline/ttl/_simulate?filter_path=docs.doc._source,docs.doc._ingest
{
  "docs": [
    {
      "_source": { "city": "Sidney", "deliver": "ASAP", "ttl": 0 }
    },
    {
      "_source": { "city": "Singapour", "deliver": "1 minute", "ttl": 1 }
    },
    {
      "_source": { "city": "Paris", "deliver": "5 minutes", "ttl": 5 }
    },
    {
      "_source": { "city": "Pôle Nord", "deliver": "pour toujours" }
    }
  ]  
}

Cela donne :

{
  "docs": [
    {
      "doc": {
        "_source": {
          "deliver": "ASAP",
          "ttl_date": "2023-11-23T11:14:42.723353333Z",
          "city": "Sidney"
        },
        "_ingest": {
          "timestamp": "2023-11-23T11:14:42.723353333Z"
        }
      }
    },
    {
      "doc": {
        "_source": {
          "deliver": "1 minute",
          "ttl_date": "2023-11-23T11:15:42.723413177Z",
          "city": "Singapour"
        },
        "_ingest": {
          "timestamp": "2023-11-23T11:14:42.723413177Z"
        }
      }
    },
    {
      "doc": {
        "_source": {
          "deliver": "5 minutes",
          "ttl_date": "2023-11-23T11:19:42.723419835Z",
          "city": "Paris"
        },
        "_ingest": {
          "timestamp": "2023-11-23T11:14:42.723419835Z"
        }
      }
    },
    {
      "doc": {
        "_source": {
          "city": "Pôle Nord",
          "deliver": "pour toujours"
        },
        "_ingest": {
          "timestamp": "2023-11-23T11:14:42.723423778Z"
        }
      }
    }
  ]
}

Nous pouvons voir les dates ainsi décalées pour la suppression des documents.

Définir automatiquement le champ ttl_date

Nous pouvons utiliser le final_pipeline index setting pour définir le pipeline ttl comme celui à utiliser juste avant l'opération d'indexation réelle.

DELETE /ttl-demo
PUT /ttl-demo
{
  "settings": {
    "final_pipeline": "ttl"
  },
  "mappings": {
    "_source": {
      "excludes": [
        "ttl_date"
      ]
    },
    "properties": {
      "ttl_date": {
        "type": "date"
      }
    }
  }
}

Vous pouvez également utiliser le default_pipeline index setting mais vous devez savoir que le pipeline ttl ne sera pas appelé si un utilisateur souhaite indexer un document avec un pipeline autre comme :

POST /ttl-demo/_doc?pipeline=my-pipeline
{ 
  "city": "Singapour", 
  "deliver": "1 minute", 
  "ttl": 1
}

Notez que nous supprimons le champ ttl_date du champ _source. Nous ne voulons pas le stocker dans le champ _source car c'est juste un champ "technique".

Indexer les documents

Nous pouvons maintenant injecter notre jeu de données :

POST /ttl-demo/_bulk
{ "index": {} }
{ "city": "Sidney", "deliver": "ASAP", "ttl": 0 }
{ "index": {} }
{ "city": "Singapour", "deliver": "1 minute", "ttl": 1 }
{ "index": {} }
{ "city": "Vilnius", "deliver": "3 minutes", "ttl": 3 }
{ "index": {} }
{ "city": "Paris", "deliver": "5 minutes", "ttl": 5 }
{ "index": {} }
{ "city": "Londres", "deliver": "6 minutes", "ttl": 6 }
{ "index": {} }
{ "city": "Montréal", "deliver": "7 minutes", "ttl": 7 }
{ "index": {} }
{ "city": "San Francisco", "deliver": "9 minutes", "ttl": 9 }
{ "index": {} }
{ "city": "Pôle Nord", "deliver": "pour toujours" }

Effacer les documents expirés

Il est maintenant facile d'exécuter un appel Delete By Query :

POST /ttl-demo/_delete_by_query
{
  "query": {
    "range": {
      "ttl_date": {
        "lte": "now"
      }
    }
  }
}

Nous voulons juste supprimer tous les documents qui ont une date d'expiration inférieure ou égale à now qui est l'heure de l'exécution de la requête.

Si nous l'exécutons immédiatement, nous pouvons voir que seul le document { "city": "Sidney", "deliver": "ASAP", "ttl": 0 } est supprimé.
Après une minute, { "city": "Singapour", "deliver": "1 minute", "ttl": 1 }. Et après quelques autres minutes, seul { "city": "Pôle Nord", "deliver": "pour toujours" } reste. Il sera conservé pour toujours.

Utiliser Watcher pour purger toutes les minutes

Vous pouvez utiliser un crontab pour exécuter une telle requête toutes les minutes :

* * * * * curl -XPOST -u elastic:changeme https://127.0.0.1:9200/ttl-demo/_delete_by_query -H 'Content-Type: application/json' -d '{"query":{"range":{"ttl_date":{"lte":"now"}}}}'

Notez que vous devrez surveiller ce travail. Mais si vous avez une licence commerciale, vous pouvez également l'exécuter directement depuis Elasticsearch en utilisant Watcher :

PUT _watcher/watch/ttl
{
  "trigger": {
    "schedule": {
      "interval": "1m"
    }
  },
  "input": {
    "simple" : {}
  },
  "condition": {
    "always" : {}
  },
  "actions": {
    "call_dbq": {
      "webhook": {
        "url": "https://127.0.0.1:9200/ttl-demo/_delete_by_query",
        "method": "post",
        "body": "{\"query\":{\"range\":{\"ttl_date\":{\"lte\":\"now\"}}}}",
        "auth": {
          "basic": {
            "username": "elastic",
            "password": "changeme"
          }
        }
      }
    }
  }
}

Notez également que nous utilisons le paramètre interval pour exécuter cette action toutes les minutes. Et nous utilisons un webhook pour appeler l'API Delete By Query. Notez enfin que nous devons également fournir les informations d'authentification.

Si vous ne faites pas tourner votre cluster sur cloud.elastic.co mais localement avec un certificat auto-signé, comme Elasticsearch est sécurisé par défaut, vous devez définir xpack.http.ssl.verification_mode sur none. Sinon, Elasticsearch ne va pas accepter le certificat auto-signé. Bien sûr, c'est juste pour les tests. Ne faites surtout pas ça en production !

Après au plus une minute, le père Noël verra maintenant que les villes qu'il doit visiter sont supprimées de la liste :

GET /ttl-demo/_search
{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 6,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "ttl-demo",
        "_id": "IBTn-4sBOKvQy-0aU35M",
        "_score": 1,
        "_source": {
          "deliver": "3 minutes",
          "city": "Vilnius"
        }
      },
      {
        "_index": "ttl-demo",
        "_id": "IRTn-4sBOKvQy-0aU35M",
        "_score": 1,
        "_source": {
          "deliver": "5 minutes",
          "city": "Paris"
        }
      },
      {
        "_index": "ttl-demo",
        "_id": "IhTn-4sBOKvQy-0aU35M",
        "_score": 1,
        "_source": {
          "deliver": "6 minutes",
          "city": "Londres"
        }
      },
      {
        "_index": "ttl-demo",
        "_id": "IxTn-4sBOKvQy-0aU35M",
        "_score": 1,
        "_source": {
          "deliver": "7 minutes",
          "city": "Montréal"
        }
      },
      {
        "_index": "ttl-demo",
        "_id": "JBTn-4sBOKvQy-0aU35M",
        "_score": 1,
        "_source": {
          "deliver": "9 minutes",
          "city": "San Francisco"
        }
      },
      {
        "_index": "ttl-demo",
        "_id": "JRTn-4sBOKvQy-0aU35M",
        "_score": 1,
        "_source": {
          "city": "Pôle Nord",
          "deliver": "pour toujours"
        }
      }
    ]
  }
}

Et à la fin, il ne restera que sa maison :

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "ttl-demo",
        "_id": "JRTn-4sBOKvQy-0aU35M",
        "_score": 1,
        "_source": {
          "city": "Pôle Nord",
          "deliver": "forever"
        }
      }
    ]
  }
}

C'est une solution simple et rapide pour supprimer les anciennes données de votre cluster Elasticsearch. Mais notez que vous ne devez jamais appliquer cette technique pour les logs ou tout autre index temporel ou si la quantité de données à supprimer est disons, supérieur à 10% de l'ensemble de vos données.

Au lieu de cela, vous devriez préférer l'API Delete Index pour supprimer un index complet d'un coup au lieu de supprimer un ensemble complet de documents. C'est beaucoup plus efficace.

Supprimer l'index (préféré)

Pour ce faire, nous pouvons en fait modifier le pipeline pour envoyer les données vers un index dont le nom contient la date ttl :

PUT /_ingest/pipeline/ttl
{
  "processors": [
    {
      "set": {
        "field": "ingest_date",
        "value": "{{{_ingest.timestamp}}}"
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": """
          ctx['ttl_date'] = ZonedDateTime.parse(ctx['ingest_date']).plusDays(ctx['ttl']);
        """,
        "ignore_failure": true
      }
    },
    {
      "date_index_name" : {
        "field" : "ttl_date",
        "index_name_prefix" : "ttl-demo-",
        "date_rounding" : "d",
        "date_formats": ["yyyy-MM-dd'T'HH:mm:ss.nz"],
        "index_name_format": "yyyy-MM-dd",
        "if": "ctx?.ttl_date != null"
      }
    },
    {
      "set": {
        "field" : "_index",
        "value": "ttl-demo-forever",
        "if": "ctx?.ttl_date == null"
      }
    },
    {
      "remove": {
        "field": [ "ingest_date", "ttl", "ttl_date" ],
        "ignore_missing": true
      }
    }
  ]
}

Dans cet exemple, j'ai basculé vers des index quotidiens car c'est beaucoup plus cohérent avec ce que vous pourriez voir en production car vous n'expirez normalement pas les données après quelques minutes mais plutôt après quelques jours ou mois.

Nous avons changé le script pour ajouter des jours :

ctx['ttl_date'] = ZonedDateTime.parse(ctx['ingest_date']).plusDays(ctx['ttl']);

Si le champ ttl_date existe, nous utilisons le processeur date_index_name pour créer un nouveau nom d'index basé sur le champ ttl_date. Nous utilisons le paramètre date_rounding pour arrondir la date au jour. Et nous utilisons le paramètre index_name_format pour formater la date comme yyyy-MM-dd. Cela utilisera des noms d'index comme ttl-demo-2023-11-27 :

{
  "date_index_name" : {
    "field" : "ttl_date",
    "index_name_prefix" : "ttl-demo-",
    "date_rounding" : "d",
    "date_formats": ["yyyy-MM-dd'T'HH:mm:ss.nz"],
    "index_name_format": "yyyy-MM-dd",
    "if": "ctx?.ttl_date != null"
  }
}

Si le champ ttl_date n'existe pas, nous définissons simplement le nom de l'index sur ttl-demo-forever :

{
  "set": {
    "field" : "_index",
    "value": "ttl-demo-forever",
    "if": "ctx?.ttl_date == null"
  }
}

Nous pouvons réindexer notre jeu de données :

POST /ttl-demo/_bulk?pipeline=ttl
{ "index": {} }
{ "city": "Sidney", "deliver": "ASAP", "ttl": 0 }
{ "index": {} }
{ "city": "Singapore", "deliver": "1 day", "ttl": 1 }
{ "index": {} }
{ "city": "Vilnius", "deliver": "3 days", "ttl": 3 }
{ "index": {} }
{ "city": "Paris", "deliver": "5 days", "ttl": 5 }
{ "index": {} }
{ "city": "North Pole", "deliver": "forever" }

And we can see that the documents are now in different indices:

Et nous pouvons voir que les documents sont maintenant dans des index différents :

GET /ttl-demo-*/_search?filter_path=hits.hits._index,hits.hits._source.deliver

Donne :

{
  "hits": {
    "hits": [
      {
        "_index": "ttl-demo-2023-11-27",
        "_source": {
          "deliver": "ASAP"
        }
      },
      {
        "_index": "ttl-demo-2023-11-28",
        "_source": {
          "deliver": "1 day"
        }
      },
      {
        "_index": "ttl-demo-2023-11-30",
        "_source": {
          "deliver": "3 days"
        }
      },
      {
        "_index": "ttl-demo-2023-12-02",
        "_source": {
          "deliver": "5 days"
        }
      },
      {
        "_index": "ttl-demo-forever",
        "_source": {
          "deliver": "forever"
        }
      }
    ]
  }
}

Le nom de l'index ne fait plus référence à la date des données mais à la date de suppression des données. Nous pouvons donc exécuter à nouveau un crontab pour supprimer les anciens index tous les jours. Le script suivant est destiné à être exécuté sur un système Mac OS X :

0 0 * * * curl -XDELETE -u elastic:changeme https://127.0.0.1:9200/ttl-demo-$(date -v -1d -j +%F)

Conclusion

Nous avons vu 2 façons de faire un TTL sur les documents Elasticsearch. La première consiste à utiliser un champ TTL et à supprimer les documents à l'aide d'un appel Delete By Query. La seconde (beaucoup plus efficace lorsqu'il y a beaucoup de données à supprimer) consiste à utiliser un champ TTL pour router les documents vers des index différents, puis à supprimer les index à l'aide d'un crontab.

Mais pour ces 2 solutions, les documents sont toujours visibles jusqu'à ce que le contrab s'exécute.

Vous pourriez penser à utiliser un alias filtré pour masquer les anciens documents :

POST _aliases
{
  "actions": [
    {
      "add": {
        "index": "ttl-demo",
        "alias": "ttl-filtered",
        "filter": {
          "bool": {
            "filter": [
              {
                "range": {
                  "ttl_date": {
                    "gt": "now/m"
                  }
                }
              }
            ]
          }
        }
      }
    }
  ]
}

Chercher dans l'alias ttl-filtered ne renverra que les documents qui ne sont pas encore expirés même si les documents expirés ne sont pas encore supprimés par le processus par lots (crontab ou watcher).

Le père Noël peut maintenant savoir où aller ensuite en toute sécurité puis profiter d'une année de repos bien méritée !

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.