Nous l’avons vu, un Flow Kestra est caractérisé par un identifiant de flow, un namespace… et des tâches ! Dans ce chapitre, nous allons explorer les différents types de tâches, leurs propriétés et la façon dont nous pouvons les interconnecter.
Comprenez les Tasks
Les tâches sont des actions individuelles au sein d'un flux. Elles peuvent prendre des entrées et des variables, et produire des sorties destinées à être utilisées par d'autres tâches. Kestra supporte à travers ses plus de 400 plugins un grand nombre de tâches. Vous pouvez aussi créer vos propres tâches si vous avez des besoins spécifiques.
Découvrez les propriétés de Tasks
Toutes les tâches partagent un ensemble de propriétés communes. Par exemple, une tâche est toujours définie par un id et un type. L’id est le nom que vous donnez à votre tâche, le type correspond au type de plugin que vous utilisez. En fonction des tâches, vous pourrez définir plusieurs autres propriétés.
Prenons l’exemple d’un flux que Julien souhaite automatiser. Il doit envoyer un email contenant le résultat d’une agrégation d’un jeu de données. Pour ce faire, il va utiliser 3 tâches :
La première est la tâche http.Download, qui permet de télécharger un fichier depuis un serveur HTTP. Elle prend en paramètre une
uri
permettant de spécifier l’adresse du fichier que l’on veut télécharger. Ci-dessous, vous trouverez un exemple permettant de télécharger un fichier CSV depuis une adresse HTTP :- id: extract type: io.kestra.plugin.core.http.Download uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv
Avec une seconde tâche, Julien va exécuter une requête SQL avec DuckDB pour agréger les données téléchargées. La propriété
inputFiles
permet d’intégrer des fichiers en entrée de la tâche. La propriétésql
permet d’écrire la requête SQL que l’on veut exécuter. La propriétéstore
permet de spécifier la façon dont le résultat de la requête sera géré par Kestra.- id: query type: io.kestra.plugin.jdbc.duckdb.Query inputFiles: data.csv: "{{ outputs.extract.uri }}" sql: | SELECT x, AVG(y) FROM read_csv_auto('{{workingDir}}/data.csv'); store: true
Enfin, Julien va utiliser la tâche
mail.MailSend
pour envoyer un email avec le résultat de sa requête sous forme de fichier. De la même façon, cette tâche prend plusieurs propriétés permettant de spécifier les configurations du serveur qui enverra l’email ainsi que le contenu du message et son destinataire.- id: send_email type: io.kestra.plugin.notifications.mail.MailSend from: Julien@company.com to: ceo@company.com username: Julien password: xxxxx host: smtp.resend.com port: 465 subject: "Here is your dataset" attachments: - name: data.csv uri: "{{ outputs.query.uri }}" htmlTextContent: "Please find attached your dataset as a CSV file"
Comparez les Runnables et les Flowables
Globalement, il existe deux types de tâches : les tâches exécutables et les tâches de flux (autrement dit, les “Runnables” et les “Flowables” en anglais).
Tâche de flux (Flowable Task) : Kestra orchestre vos flux à l'aide des tâches de flux. Celles-ci contrôlent le comportement d'orchestration, vous permettant d'implémenter des modèles de flux plus avancés. Elles n'effectuent aucun calcul lourd. Exemples de tâches de flux :
io.kestra.plugin.core.flow.Parallel:
permet d’exécuter des tâches en parallèle ;io.kestra.plugin.core.flow.Switch:
permet de créer des branches selon de multiples conditions.
Tâche exécutable (Runnable Task) : Dans Kestra, la plupart des charges de traitement de données sont exécutées à l'aide de tâches exécutables. Contrairement aux tâches de flux, les tâches exécutables sont responsables de l'exécution du process réel. Par exemple, les opérations sur le système de fichiers, les appels d'API, les requêtes de base de données, etc. Exemples de tâches exécutables :
io.kestra.plugin.scripts.python.Commands:
permet d’exécuter un script Python ;io.kestra.plugin.core.http.Request:
permet d’exécuter une requête HTTP ;io.kestra.plugin.notifications.slack.SlackExecution:
permet d’envoyer un message via Slack.
Découvrez les avantages de l’éditeur Kestra
L'interface utilisateur de Kestra vous permet d'accéder directement à la documentation des tâches. En sélectionnant simplement une tâche dans le code YAML, vous pouvez voir des exemples et la documentation correspondante à proximité de votre code.
Il s’agit d’un atout majeur : dans notre quotidien d’ingénieur, on se retrouve souvent à devoir jongler entre la documentation d’un outil, poser une question sur un channel Slack, regarder des réponses sur StackOverflow ou même demander à ChatGPT de nous aider. Devoir changer de contexte est fatigant et réduit nos capacités de concentration. Avoir tout au même endroit dans l’interface Kestra permet de réduire cette charge cognitive et de se concentrer sur ce qui compte : la création de votre Flow !
L'éditeur de Kestra offre une expérience maximale grâce à la saisie automatique des propriétés des tâches lorsque vous tapez sur votre clavier.
Regardons cela ensemble dans la vidéo suivante :
Utilisez les Outputs
Vous l’avez peut-être déjà compris : les tâches peuvent transmettre des informations à d’autres tâches. Cela se fait avec la notion d’Outputs. Comme pour les propriétés, chaque tâche expose des outputs. Les Outputs sont stockées dans le contexte d'exécution du flux (c'est-à-dire en mémoire) et peuvent être utilisées par toutes les tâches et flux en aval. Regardons cela plus en détail.
Expressions Pebble
Kestra utilise le moteur de template Pebble pour effectuer un rendu dynamique des variables, des inputs et des outputs dans le contexte d'exécution. Pour les développeurs Python comme Julien, il s’agit d’un moteur similaire au package Jinja.
Exemple :
Nous utilisons ici la tâche Return qui possède un attribut d’outputs nommé value
.
id: task_outputs_example
namespace: company.team
tasks:
- id: produce_output
type: io.kestra.plugin.core.debug.Return
format: 42
- id: use-output
type: io.kestra.plugin.core.log.Log
message: The previous task output is {{ outputs.produce_output.value }}
Dans l'exemple ci-dessus, la première tâche produit une Outputs basée sur la propriété de tâcheformat
. Cet attribut de sortie est ensuite utilisé dans la propriété message
de la deuxième tâche.
L'expression Pebble {{ outputs.product_outputs.value }}
fait référence à l’outputs de la tâche précédente. Vous pouvez en savoir plus sur la syntaxe des expressions sur la page Utilisation des expressions.
Autres expressions
Le moteur de template Pebble est très complet et permet un grand nombre d’opérations. Vous pouvez aussi utiliser les expressions jq pour parser des outputs complexes ou encore utiliser les fonctions de “date” pour formater des dates.
Stockage interne et abstraction
Selon la tâche, les données peuvent être stockées dans le contexte d'exécution (les outputs) ou dans le stockage interne de Kestra.
Contexte d'exécution
Nous l’avons vu précédemment : il s’agit des outputs. Les données peuvent être stockées sous forme de variables dans le contexte d'exécution du flux. Cela peut être pratique pour partager des données entre les tâches.
Stockage interne
Kestra dispose d'un stockage interne capable de stocker des données de toute taille. Par défaut, le stockage interne utilise le système de fichiers de votre instance Kestra, mais des plugins existent pour utiliser d'autres implémentations comme Amazon S3, Google Cloud Storage ou Microsoft Azure Blobs Storage. Lors de l'utilisation du stockage interne, les données sont stockées par défaut au format Ion, un format très proche du JSON.
Les tâches pouvant stocker des données dans le stockage interne ont généralement un output nommé uri
qui peut être utilisé pour accéder à ce fichier dans les tâches suivantes, limitant ainsi l’usage du contexte d’exécution (outputs) potentiellement coûteux en mémoire.
À vous de jouer !
Nous avons maintenant tous les éléments pour créer notre premier Flow !
Dans cet exercice, nous allons télécharger un fichier et vérifier si la réponse du serveur est valide ou non.
Utiliser la task de type
http.Download
pour télécharger le dataset.En utilisant une task de type
flows.If
, vérifier si le code de réponse de la tâche précédente est bien en “200” ou non. Affichez un message différent selon la condition.
Voici quelques éléments qui pourront vous aider :
Utiliser l’auto-complétion dans l’éditeur.
Utiliser la documentation dans l’éditeur pour découvrir les différents exemples et propriétés des tâches.
Voici l'exercice :
id: exercise1
namespace: open_class_room.kestra
description: In this exercise you will see how to download a data file and use a flowable task.
tasks:
# 1. Download data
# A csv file containing salaries for several data jobs can be found at https://raw.githubusercontent.com/kestra-io/datasets/main/csv/salaries.csv
# Use the `http.Download` task type to download the file. See how the auto-completion work when you start typing htt... (you can enable auto-completion with CTRL+SPACE)
- id: download
type:
uri:
# 2. Check if the http status code is successful or not (code 200)
# Using the `io.kestra.plugin.core.flow.If` task, check if the previous download request ended successfully with a code == 200.
# Log two different messages upon condition.
# Tips:
# - Each task expose outputs, you can access these ouptputs using Pebble syntax: '{{ outputs.task_name.exposed_value }}'
# - Focus on a task and use the "Source & documentation" view to see the different examples and outputs exposed by a task.
- id: check_code
type:
condition:
then:
- id: log_success
type:
else:
- id: log_error
type:
En résumé
Les tâches sont les éléments fondamentaux d’un flow.
Chaque tâche est définie par un type (plugin) et comporte plusieurs propriétés.
Il existe deux types de tâches différentes : les flowable et les runnable.
L’éditeur de code directement inclus dans Kestra permet de réduire la charge cognitive lors du développement.
Les tâches exposent des meta-data via les Outputs.
Les outputs permettent de faire transiter des informations entre les tâches et les flux.
Vous pouvez manipuler les outputs et faire certaines transformations basiques avec la syntaxe de templating Peeble.
Vous savez maintenant créer un flow et décrire des tâches ! Dans le chapitre suivant nous allons voir comment exécuter et monitorer un flow.