Kestra offre de nombreux avantages concernant la création de flows avancés : exécution en parallèle, création de briques réutilisables, gestion des erreurs, etc. Dans ce chapitre, nous allons explorer certaines des tâches les plus utiles pour gérer des projets complexes avec Kestra.
Créez des subflows
Les Subflows vous permettent de construire des composants modulaires et réutilisables que vous pouvez utiliser dans plusieurs flux. Par exemple, vous pouvez avoir un Subflow dédié à l'alerte des erreurs sur Slack et par e-mail. En utilisant un Subflow, vous pouvez réutiliser ces deux tâches ensemble pour tous les flux auxquels vous souhaitez envoyer des notifications d'erreur, au lieu de devoir copier les tâches individuelles pour chaque flux.
Pour appeler un flux à partir d'un autre flux, utilisez la tâche
io.kestra.plugin.core.flow.Subflow
. Dans cette tâche, spécifiez-leflowId
et lenamespace
du Subflow que vous souhaitez exécuter. Vous pouvez également spécifier des inputs personnalisés de manière optionnelle, de la même manière que vous passeriez des arguments dans un appel de fonction.Les propriétés optionnelles
wait
ettransmitFailed
contrôlent le comportement de l'exécution. Par défaut, siwait
n'est pas défini ou défini sur false, le flux parent continue l'exécution sans attendre la fin du Subflow. La propriététransmitFailed
détermine si un échec dans l'exécution du Subflow doit entraîner un échec du flux parent.
Exécutez des tâches en parallèle
Que ce soit pour ne pas se répéter, lancer des tâches avec des valeurs de paramètres différents ou tout simplement pour accélérer un process : exécuter des tâches en parallèle est l’un des éléments majeurs de l’orchestration. La tâche flow.Parallel est la plus simple pour gérer ce type de comportement. Il suffit de renseigner les tâches qu’on veut lancer en parallèle :
id: parallel
namespace: open_class_room.kestra
tasks:
- id: parallel
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: 1st
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.startDate }}"
- id: 2nd
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.id }}"
- id: last
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.startDate }}"
Néanmoins, cette tâche est assez limitée si nous avons 10 tâches ou plus à lancer ou même si le nombre de tâches est dynamique. Grâce à la tâcheForEach
, vous pouvez exécuter des tâches en parallèle en fonction d’une liste. Une propriété concurrencyLimit
permettant de contrôler le nombre d'exécutions concurrentes :
id: for_each_value
namespace: open_class_room.kestra
tasks:
- id: for_each
type: io.kestra.plugin.core.flow.ForEach
values:
- value 1
- value 2
- value 3
- value 4
concurrencyLimit: 2
tasks:
- id: return
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} with value '{{ taskrun.value }}'"
La valeur {{ taskrun.value }}
permettant d’accéder à la valeur en cours de traitement.
L’une des limitations du ForEach est sa nature orientée tâche qui est non-flow. En effet, il est conseillé d’avoir plusieurs flows avec peu de tâches plutôt que peu de flow avec beaucoup de tâches. La tâche flow.ForEach
garde tout le contexte du flow en mémoire pendant l’exécution. Cela peut réduire les performances si un trop grand nombre d’éléments sont traités.
Optimisez le traitement de fichier en parallèle
Pour régler cette limitation, nous pouvons utiliser une tâche spécifique pour créer des exécutions de flow distinctes. La tâche flow.ForEachItem
permet d’exécuter des Subflow pour des lots d’items. Elle est donc particulièrement utile lorsque vous voulez traiter un fichier contenant beaucoup d’éléments par lot.
La tâche flow.ForEachItem
se paramètre de la façon suivante :
items
: il s’agit du fichier que vous voulez traiter ;batch
: la façon dont la tâche va séparer les différents éléments en lot ;flowId
etnamespace
: le subflow qui va être exécuté pour chaque lot.
Voici un exemple illustrant l’utilisation de la tâche flow.ForEachItem
pour traiter les résultats d’une requête SQL ligne par ligne :
id: orders_parallel
namespace: open_class_room.kestra
tasks:
- id: extract
type: io.kestra.plugin.jdbc.duckdb.Query
sql: |
INSTALL httpfs;
LOAD httpfs;
SELECT *
FROM read_csv_auto('https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv', header=True);
store: true
- id: each
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ outputs.extract.uri }}"
batch:
rows: 1
namespace: open_class_room.kestra
flowId: orders
wait: true # wait for the subflow execution
transmitFailed: true # fail the task run if the subflow execution fails
inputs:
order: "{{ taskrun.items }}" # special variable that contains the items of the batch
Et voici le Subflow associé :
id: orders
namespace: company.team
inputs:
- id: order
type: STRING
tasks:
- id: read_file
type: io.kestra.plugin.scripts.shell.Commands
runner: PROCESS
commands:
- cat "{{ inputs.order }}"
- id: read_file_content
type: io.kestra.plugin.core.log.Log
message: "{{ read(inputs.order) }}"
Gérez les erreurs
Si Julien a bien retenu une chose de ses expériences en data-engineering, c’est que les pipelines vont forcément tomber en erreur à un moment. Il est donc très important de gérer les cas d’erreurs dès la création de nos flux. Pour ce faire, Kestra offre deux types d'erreurs :
Global : Gestion d'erreurs globale d'un flux qui doit se trouver à la racine du flux.
Locale : Gestion d'erreurs locale d'une tâche Flowable, qui gérera les erreurs de la tâche Flowable et de ses enfants.
Gestion Global
Cet exemple de flux comporte une seule tâche Bash qui échoue immédiatement. La propriété globale errors
du flow sera alors appelée pour que la deuxième tâche puisse s'exécuter.
id: erreurs
namespace: open_class_room.kestra
tasks:
- id: failed
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- exit 1
errors:
- id: 2nd
type: io.kestra.plugin.core.log.Log
message: Echec de la tâche {{task.id}}
level: INFO
Vous pouvez gérer votre stratégie d’erreurs en renseignant plusieurs tâches. On peut imaginer appeler un autre flow, envoyer un message, effectuer une opération spécifique, etc.
Gestion Locale
Avec la gestion locale, vous pouvez être plus précis et gérer les erreurs au niveau d’une tâche Flowable. Dans cet exemple, la branche d'erreur ne sera utilisée que si un enfant de la tâche t2 a une erreur. Si la tâche t1 échoue, la branche d'erreur ne sera pas utilisée :
id: errors
namespace: open_class_room.kestra
tasks:
- id: parent-seq
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: t1
type: io.kestra.plugin.core.debug.Return
format: "{{task.id}} > {{taskrun.startDate}}"
- id: t2
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: t2-t1
type: io.kestra.plugin.scripts.shell.Commands
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- 'exit 1'
errors:
- id: error-t1
type: io.kestra.plugin.core.debug.Return
format: "Error Trigger ! {{task.id}}"
Cela peut être utile pour restreindre la gestion des erreurs à une partie spécifique du flux.
Retry
Parfois les erreurs sont liées à des services ou comportements indépendants de notre contrôle. Par exemple, une API qui n’est plus disponible, un comportement non déterministe dans une application tierce, etc.
La propriété retry
gère ces échecs temporaires dans vos workflows. Elles sont définies au niveau de la tâche et peuvent être configurées pour réessayer une tâche un certain nombre de fois, ou avec un certain délai entre chaque tentative. L'exemple suivant définit une Retry pour la tâche retry-sample
avec un maximum de 5 tentatives toutes les 15 minutes :
- id: retry-sample
type: io.kestra.plugin.core.log.Log
message: my output for task {{task.id}}
timeout: PT10M
retry:
type: constant
maxAttempt: 5
interval: PT15M
D'autres types de Retries peuvent être utilisés, comme l'exponentiel et le random.
Les Retries peuvent aussi être paramétrées au niveau du flux - définissant des comportements de Retry par défaut pour chaque tâche du flux. Elles peuvent également être définies dans la configuration de l'instance Kestra pour configurer des Retries globalement pour toutes les tâches.
Nettoyez les artifacts Kestra
Les exécutions de flux créent des artifacts tels que des outputs, des fichiers dans le stockage interne ou des logsPour
ne pas surcharger votre base de données et votre stockage, il est important de nettoyer ces artefacts après un certain temps. Kestra propose des tâches de Purge, permettant de purger les stockages, les logs, les exécutions.
Deux tâches de purge existent :
storage.PurgeExecution
storage.Purge
Regardons ces deux tâches de plus près :
La tâche PurgeExecution
peut purger tous les fichiers stockés dans le contexte interne par une exécution. Elle peut être utilisée à la fin d'un flux pour purger tous les fichiers générés. Le contexte d'exécution lui-même ne sera pas disponible après la fin de l'exécution et sera automatiquement supprimé de Kestra après une période de rétention (par défaut, sept jours) qui peut être modifiée selon la configuration de la tâche.
La tâche storage.Purge
peut être utilisée pour purger les stockages, les logs, les exécutions d'une exécution précédente. Par exemple, ce flux purgera tout cela chaque jour :
id: purge
namespace: open_class_room.kestra
tasks:
- id: "purge"
type: "io.kestra.plugin.core.storage.Purge"
endDate: "{{ now() | dateAdd(-1, 'MONTHS') }}"
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 0 * * *"
En résumé
Kestra offre des tâches avancées permettant de contrôler vos workflow de façon précise : les subflow et les flowable de tâche en parallèle figurent parmi les plus importantes.
Les propriétés
errors
permettent de gérer les cas d’erreurs de façon personnalisée.Les propriétés
retry
permettent de relancer des tâches automatiquement en fonction d’une cadence paramétrée.Vous pouvez nettoyer et purger les artefacts créés par les exécutions avec les tâches de Purges.
Avec les erreurs et lesretry
, nous avons vu un premier élément de gestion d’incident dans Kestra. Voyons maintenant comment envoyer des messages d’alertes.