• 6 heures
  • Moyenne

Ce cours est visible gratuitement en ligne.

course.header.alt.is_certifying

J'ai tout compris !

Mis à jour le 03/12/2024

Utilisez des tâches avancées

Kestra offre de nombreux avantages concernant la création de flows avancés : exécution en parallèle, création de briques réutilisables, gestion des erreurs, etc. Dans ce chapitre, nous allons explorer certaines des tâches les plus utiles pour gérer des projets complexes avec Kestra.

Créez des subflows

Les Subflows vous permettent de construire des composants modulaires et réutilisables que vous pouvez utiliser dans plusieurs flux. Par exemple, vous pouvez avoir un Subflow dédié à l'alerte des erreurs sur Slack et par e-mail. En utilisant un Subflow, vous pouvez réutiliser ces deux tâches ensemble pour tous les flux auxquels vous souhaitez envoyer des notifications d'erreur, au lieu de devoir copier les tâches individuelles pour chaque flux.

  1. Pour appeler un flux à partir d'un autre flux, utilisez la tâche io.kestra.plugin.core.flow.Subflow. Dans cette tâche, spécifiez-le flowId et lenamespace du Subflow que vous souhaitez exécuter. Vous pouvez également spécifier des inputs personnalisés de manière optionnelle, de la même manière que vous passeriez des arguments dans un appel de fonction.

  2. Les propriétés optionnelles wait et transmitFailed contrôlent le comportement de l'exécution. Par défaut, si wait n'est pas défini ou défini sur false, le flux parent continue l'exécution sans attendre la fin du Subflow. La propriététransmitFailed détermine si un échec dans l'exécution du Subflow doit entraîner un échec du flux parent.

Exécutez des tâches en parallèle

Que ce soit pour ne pas se répéter, lancer des tâches avec des valeurs de paramètres différents ou tout simplement pour accélérer un process : exécuter des tâches en parallèle est l’un des éléments majeurs de l’orchestration. La tâche flow.Parallel est la plus simple pour gérer ce type de comportement. Il suffit de renseigner les tâches qu’on veut lancer en parallèle :

id: parallel
namespace: open_class_room.kestra

tasks:
  - id: parallel
    type: io.kestra.plugin.core.flow.Parallel
    tasks:
      - id: 1st
        type: io.kestra.plugin.core.debug.Return
        format: "{{ task.id }} > {{ taskrun.startDate }}"
      - id: 2nd
        type: io.kestra.plugin.core.debug.Return
        format: "{{ task.id }} > {{ taskrun.id }}"
  - id: last
    type: io.kestra.plugin.core.debug.Return
    format: "{{ task.id }} > {{ taskrun.startDate }}"

Néanmoins, cette tâche est assez limitée si nous avons 10 tâches ou plus à lancer ou même si le nombre de tâches est dynamique. Grâce à la tâcheForEach, vous pouvez exécuter des tâches en parallèle en fonction d’une liste. Une propriété concurrencyLimit permettant de contrôler le nombre d'exécutions concurrentes :

id: for_each_value
namespace: open_class_room.kestra

tasks:
  - id: for_each
    type: io.kestra.plugin.core.flow.ForEach
    values: 
      - value 1
      - value 2
      - value 3
      - value 4
    concurrencyLimit: 2
    tasks:
      - id: return
        type: io.kestra.plugin.core.debug.Return
        format: "{{ task.id }} with value '{{ taskrun.value }}'"

La valeur {{ taskrun.value }} permettant d’accéder à la valeur en cours de traitement.

L’une des limitations du ForEach est sa nature orientée tâche qui est non-flow. En effet, il est conseillé d’avoir plusieurs flows avec peu de tâches plutôt que peu de flow avec beaucoup de tâches. La tâche flow.ForEach garde tout le contexte du flow en mémoire pendant l’exécution. Cela peut réduire les performances si un trop grand nombre d’éléments sont traités.

Optimisez le traitement de fichier en parallèle

Pour régler cette limitation, nous pouvons utiliser une tâche spécifique pour créer des exécutions de flow distinctes. La tâche flow.ForEachItem permet d’exécuter des Subflow pour des lots d’items. Elle est donc particulièrement utile lorsque vous voulez traiter un fichier contenant beaucoup d’éléments par lot.

La tâche flow.ForEachItem se paramètre de la façon suivante :

  • items: il s’agit du fichier que vous voulez traiter ;

  • batch: la façon dont la tâche va séparer les différents éléments en lot ;

  • flowId et namespace: le subflow qui va être exécuté pour chaque lot.

Voici un exemple illustrant l’utilisation de la tâche flow.ForEachItem pour traiter les résultats d’une requête SQL ligne par ligne :

id: orders_parallel
namespace: open_class_room.kestra

tasks:
  - id: extract
    type: io.kestra.plugin.jdbc.duckdb.Query
    sql: |
      INSTALL httpfs;
      LOAD httpfs;
      SELECT *
      FROM read_csv_auto('https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv', header=True);
    store: true

  - id: each
    type: io.kestra.plugin.core.flow.ForEachItem
    items: "{{ outputs.extract.uri }}"
    batch:
      rows: 1
    namespace: open_class_room.kestra
    flowId: orders
    wait: true # wait for the subflow execution
    transmitFailed: true # fail the task run if the subflow execution fails
    inputs:
      order: "{{ taskrun.items }}" # special variable that contains the items of the batch

Et voici le Subflow associé :

id: orders
namespace: company.team

inputs:
  - id: order
    type: STRING

tasks:
  - id: read_file
    type: io.kestra.plugin.scripts.shell.Commands
    runner: PROCESS
    commands:
      - cat "{{ inputs.order }}"

  - id: read_file_content
    type: io.kestra.plugin.core.log.Log
    message: "{{ read(inputs.order) }}"

Gérez les erreurs

Si Julien a bien retenu une chose de ses expériences en data-engineering, c’est que les pipelines vont forcément tomber en erreur à un moment. Il est donc très important de gérer les cas d’erreurs dès la création de nos flux. Pour ce faire, Kestra offre deux types d'erreurs :

  • Global : Gestion d'erreurs globale d'un flux qui doit se trouver à la racine du flux.

  • Locale : Gestion d'erreurs locale d'une tâche Flowable, qui gérera les erreurs de la tâche Flowable et de ses enfants.

Gestion Global

Cet exemple de flux comporte une seule tâche Bash qui échoue immédiatement. La propriété globale errors du flow sera alors appelée pour que la deuxième tâche puisse s'exécuter.

id: erreurs 
namespace: open_class_room.kestra
tasks:
  - id: failed 
    type: io.kestra.plugin.scripts.shell.Commands 
    taskRunner: 
       type: io.kestra.plugin.core.runner.Process 
    commands: 
      - exit 1 
errors:
   - id: 2nd 
     type: io.kestra.plugin.core.log.Log 
     message: Echec de la tâche {{task.id}} 
     level: INFO

Vous pouvez gérer votre stratégie d’erreurs en renseignant plusieurs tâches. On peut imaginer appeler un autre flow, envoyer un message, effectuer une opération spécifique, etc.

Gestion Locale

Avec la gestion locale, vous pouvez être plus précis et gérer les erreurs au niveau d’une tâche Flowable. Dans cet exemple, la branche d'erreur ne sera utilisée que si un enfant de la tâche t2 a une erreur. Si la tâche t1 échoue, la branche d'erreur ne sera pas utilisée :

id: errors
namespace: open_class_room.kestra

tasks:
  - id: parent-seq
    type: io.kestra.plugin.core.flow.Sequential
    tasks:
      - id: t1
        type: io.kestra.plugin.core.debug.Return
        format: "{{task.id}} > {{taskrun.startDate}}"
      - id: t2
        type: io.kestra.plugin.core.flow.Sequential
        tasks:
          - id: t2-t1
            type: io.kestra.plugin.scripts.shell.Commands
            taskRunner:
              type: io.kestra.plugin.core.runner.Process
            commands:
              - 'exit 1'
        errors:
          - id: error-t1
            type: io.kestra.plugin.core.debug.Return
            format: "Error Trigger ! {{task.id}}"

Cela peut être utile pour restreindre la gestion des erreurs à une partie spécifique du flux.

Retry

Parfois les erreurs sont liées à des services ou comportements indépendants de notre contrôle. Par exemple, une API qui n’est plus disponible, un comportement non déterministe dans une application tierce, etc.

La propriété retry gère ces échecs temporaires dans vos workflows. Elles sont définies au niveau de la tâche et peuvent être configurées pour réessayer une tâche un certain nombre de fois, ou avec un certain délai entre chaque tentative. L'exemple suivant définit une Retry pour la tâche retry-sample avec un maximum de 5 tentatives toutes les 15 minutes :

- id: retry-sample
  type: io.kestra.plugin.core.log.Log
  message: my output for task {{task.id}}
  timeout: PT10M
  retry:
    type: constant
    maxAttempt: 5
    interval: PT15M

D'autres types de Retries peuvent être utilisés, comme l'exponentiel et le random.

Les Retries peuvent aussi être paramétrées au niveau du flux - définissant des comportements de Retry par défaut pour chaque tâche du flux. Elles peuvent également être définies dans la configuration de l'instance Kestra pour configurer des Retries globalement pour toutes les tâches.

Nettoyez les artifacts Kestra

Les exécutions de flux créent des artifacts tels que des outputs, des fichiers dans le stockage interne ou des  logsPour  ne pas surcharger votre base de données et votre stockage, il est important de nettoyer ces artefacts après un certain temps. Kestra propose des tâches de Purge, permettant de purger les stockages, les logs, les exécutions.

Deux tâches de purge existent :

  • storage.PurgeExecution

  • storage.Purge

Regardons ces deux tâches de plus près :

La tâche PurgeExecution peut purger tous les fichiers stockés dans le contexte interne par une exécution. Elle peut être utilisée à la fin d'un flux pour purger tous les fichiers générés. Le contexte d'exécution lui-même ne sera pas disponible après la fin de l'exécution et sera automatiquement supprimé de Kestra après une période de rétention (par défaut, sept jours) qui peut être modifiée selon la configuration de la tâche.

La tâche storage.Purge peut être utilisée pour purger les stockages, les logs, les exécutions d'une exécution précédente. Par exemple, ce flux purgera tout cela chaque jour :

id: purge
namespace: open_class_room.kestra

tasks:
  - id: "purge"
    type: "io.kestra.plugin.core.storage.Purge"
    endDate: "{{ now() | dateAdd(-1, 'MONTHS') }}"

triggers:
  - id: schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 0 * * *"

En résumé

  • Kestra offre des tâches avancées permettant de contrôler vos workflow de façon précise : les subflow et les flowable de tâche en parallèle figurent parmi les plus importantes.

  • Les propriétés errors permettent de gérer les cas d’erreurs de façon personnalisée.

  • Les propriétés retry permettent de relancer des tâches automatiquement en fonction d’une cadence paramétrée.

  • Vous pouvez nettoyer et purger les artefacts créés par les exécutions avec les tâches de Purges.

Avec les erreurs et lesretry, nous avons vu un premier élément de gestion d’incident dans Kestra. Voyons maintenant comment envoyer des messages d’alertes.

Exemple de certificat de réussite
Exemple de certificat de réussite