• 6 heures
  • Moyenne

Ce cours est visible gratuitement en ligne.

course.header.alt.is_certifying

J'ai tout compris !

Mis à jour le 03/12/2024

Déclarez des Tasks

Nous l’avons vu, un Flow Kestra est caractérisé par un identifiant de flow, un namespace… et des tâches ! Dans ce chapitre, nous allons explorer les différents types de tâches, leurs propriétés et la façon dont nous pouvons les interconnecter.

Comprenez les Tasks

Les tâches sont des actions individuelles au sein d'un flux. Elles peuvent prendre des entrées et des variables, et produire des sorties destinées à être utilisées par d'autres tâches. Kestra supporte à travers ses plus de 400 plugins un grand nombre de tâches. Vous pouvez aussi créer vos propres tâches si vous avez des besoins spécifiques.

Découvrez les propriétés de Tasks

Toutes les tâches partagent un ensemble de propriétés communes. Par exemple, une tâche est toujours définie par un id et un type. L’id est le nom que vous donnez à votre tâche, le type correspond au type de plugin que vous utilisez. En fonction des tâches, vous pourrez définir plusieurs autres propriétés.

Prenons l’exemple d’un flux que Julien souhaite automatiser. Il doit envoyer un email contenant le résultat d’une agrégation d’un jeu de données. Pour ce faire, il va utiliser 3 tâches :

  1. La première est la tâche http.Download, qui permet de télécharger un fichier depuis un serveur HTTP. Elle prend en paramètre une uri permettant de spécifier l’adresse du fichier que l’on veut télécharger. Ci-dessous, vous trouverez un exemple permettant de télécharger un fichier CSV depuis une adresse HTTP :

    - id: extract
      type: io.kestra.plugin.core.http.Download
      uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv
  2. Avec une seconde tâche, Julien va exécuter une requête SQL avec DuckDB pour agréger les données téléchargées. La propriété inputFiles permet d’intégrer des fichiers en entrée de la tâche. La propriété sql permet d’écrire la requête SQL que l’on veut exécuter. La propriété store permet de spécifier la façon dont le résultat de la requête sera géré par Kestra.

    - id: query
      type: io.kestra.plugin.jdbc.duckdb.Query
      inputFiles:
        data.csv: "{{ outputs.extract.uri }}"
      sql: |
        SELECT x, AVG(y) 
        FROM read_csv_auto('{{workingDir}}/data.csv');
      store: true
  3. Enfin, Julien va utiliser la tâche mail.MailSend pour envoyer un email avec le résultat de sa requête sous forme de fichier. De la même façon, cette tâche prend plusieurs propriétés permettant de spécifier les configurations du serveur qui enverra l’email ainsi que le contenu du message et son destinataire.

    - id: send_email
      type: io.kestra.plugin.notifications.mail.MailSend
      from: Julien@company.com
      to: ceo@company.com
      username: Julien
      password: xxxxx
      host: smtp.resend.com
      port: 465
      subject: "Here is your dataset"
      attachments:
        - name: data.csv
          uri: "{{ outputs.query.uri }}"
      htmlTextContent: "Please find attached your dataset as a CSV file"

Comparez les Runnables et les Flowables

Globalement, il existe deux types de tâches : les tâches exécutables et les tâches de flux (autrement dit, les “Runnables” et les “Flowables” en anglais).

  • Tâche de flux (Flowable Task) : Kestra orchestre vos flux à l'aide des tâches de flux. Celles-ci contrôlent le comportement d'orchestration, vous permettant d'implémenter des modèles de flux plus avancés. Elles n'effectuent aucun calcul lourd. Exemples de tâches de flux :
    io.kestra.plugin.core.flow.Parallel: permet d’exécuter des tâches en parallèle ;
    io.kestra.plugin.core.flow.Switch:permet de créer des branches selon de multiples conditions.

  • Tâche exécutable (Runnable Task) : Dans Kestra, la plupart des charges de traitement de données sont exécutées à l'aide de tâches exécutables. Contrairement aux tâches de flux, les tâches exécutables sont responsables de l'exécution du process réel. Par exemple, les opérations sur le système de fichiers, les appels d'API, les requêtes de base de données, etc. Exemples de tâches exécutables :
    io.kestra.plugin.scripts.python.Commands: permet d’exécuter un script Python ;
    io.kestra.plugin.core.http.Request: permet d’exécuter une requête HTTP ;
    io.kestra.plugin.notifications.slack.SlackExecution:permet d’envoyer un message via Slack.

Découvrez les avantages de l’éditeur Kestra

L'interface utilisateur de Kestra vous permet d'accéder directement à la documentation des tâches. En sélectionnant simplement une tâche dans le code YAML, vous pouvez voir des exemples et la documentation correspondante à proximité de votre code.

Il s’agit d’un atout majeur : dans notre quotidien d’ingénieur, on se retrouve souvent à devoir jongler entre la documentation d’un outil, poser une question sur un channel Slack, regarder des réponses sur StackOverflow ou même demander à ChatGPT de nous aider. Devoir changer de contexte est fatigant et réduit nos capacités de concentration. Avoir tout au même endroit dans l’interface Kestra permet de réduire cette charge cognitive et de se concentrer sur ce qui compte : la création de votre Flow !

L'éditeur de Kestra offre une expérience maximale grâce à la saisie automatique des propriétés des tâches lorsque vous tapez sur votre clavier.

Regardons cela ensemble dans la vidéo suivante :

Utilisez les Outputs

Vous l’avez peut-être déjà compris : les tâches peuvent transmettre des informations à d’autres tâches. Cela se fait avec la notion d’Outputs. Comme pour les propriétés, chaque tâche expose des outputs. Les Outputs sont stockées dans le contexte d'exécution du flux (c'est-à-dire en mémoire) et peuvent être utilisées par toutes les tâches et flux en aval. Regardons cela plus en détail.

Expressions Pebble

Kestra utilise le moteur de template Pebble pour effectuer un rendu dynamique des variables, des inputs et des outputs dans le contexte d'exécution. Pour les développeurs Python comme Julien, il s’agit d’un moteur similaire au package Jinja.

Exemple :

Nous utilisons ici la tâche Return qui possède un attribut d’outputs nommé value.

id: task_outputs_example
namespace: company.team
tasks:
  - id: produce_output
    type: io.kestra.plugin.core.debug.Return
    format: 42

  - id: use-output
    type: io.kestra.plugin.core.log.Log
    message: The previous task output is {{ outputs.produce_output.value }}

Dans l'exemple ci-dessus, la première tâche produit une Outputs basée sur la propriété de tâcheformat. Cet attribut de sortie est ensuite utilisé dans la propriété message de la deuxième tâche.

L'expression Pebble {{ outputs.product_outputs.value }} fait référence à l’outputs de la tâche précédente. Vous pouvez en savoir plus sur la syntaxe des expressions sur la page Utilisation des expressions.

Autres expressions

Le moteur de template Pebble est très complet et permet un grand nombre d’opérations. Vous pouvez aussi utiliser les expressions jq pour parser des outputs complexes ou encore utiliser les fonctions de “date” pour formater des dates.

Stockage interne et abstraction

Selon la tâche, les données peuvent être stockées dans le contexte d'exécution (les outputs) ou dans le stockage interne de Kestra.

Contexte d'exécution

Nous l’avons vu précédemment : il s’agit des outputs. Les données peuvent être stockées sous forme de variables dans le contexte d'exécution du flux. Cela peut être pratique pour partager des données entre les tâches.

Stockage interne

Kestra dispose d'un stockage interne capable de stocker des données de toute taille. Par défaut, le stockage interne utilise le système de fichiers de votre instance Kestra, mais des plugins existent pour utiliser d'autres implémentations comme Amazon S3, Google Cloud Storage ou Microsoft Azure Blobs Storage. Lors de l'utilisation du stockage interne, les données sont stockées par défaut au format Ion, un format très proche du JSON.

Les tâches pouvant stocker des données dans le stockage interne ont généralement un output nommé uri qui peut être utilisé pour accéder à ce fichier dans les tâches suivantes, limitant ainsi l’usage du contexte d’exécution (outputs) potentiellement coûteux en mémoire.

À vous de jouer !

Nous avons maintenant tous les éléments pour créer notre premier Flow !

Dans cet exercice, nous allons télécharger un fichier et vérifier si la réponse du serveur est valide ou non.

  1. Utiliser la task de type http.Download pour télécharger le dataset.

  2. En utilisant une task de type flows.If, vérifier si le code de réponse de la tâche précédente est bien en “200” ou non. Affichez un message différent selon la condition.

Voici quelques éléments qui pourront vous aider :

  • Utiliser l’auto-complétion dans l’éditeur.

  • Utiliser la documentation dans l’éditeur pour découvrir les différents exemples et propriétés des tâches.

Voici l'exercice :

id: exercise1
namespace: open_class_room.kestra
description: In this exercise you will see how to download a data file and use a flowable task.

tasks:

  # 1. Download data
  # A csv file containing salaries for several data jobs can be found at https://raw.githubusercontent.com/kestra-io/datasets/main/csv/salaries.csv
  # Use the `http.Download` task type to download the file. See how the auto-completion work when you start typing htt... (you can enable auto-completion with CTRL+SPACE)
  - id: download
    type: 
    uri: 

  # 2. Check if the http status code is successful or not (code 200)
  # Using the `io.kestra.plugin.core.flow.If` task, check if the previous download request ended successfully with a code == 200.
  # Log two different messages upon condition.
  # Tips: 
  # - Each task expose outputs, you can access these ouptputs using Pebble syntax: '{{ outputs.task_name.exposed_value }}'
  # - Focus on a task and use the "Source & documentation" view to see the different examples and outputs exposed by a task.
  - id: check_code
    type: 
    condition: 
    then:
      - id: log_success
        type: 

    else:
      - id: log_error
        type: 

En résumé

  • Les tâches sont les éléments fondamentaux d’un flow.

  • Chaque tâche est définie par un type (plugin) et comporte plusieurs propriétés.

  • Il existe deux types de tâches différentes : les flowable et les runnable.

  • L’éditeur de code directement inclus dans Kestra permet de réduire la charge cognitive lors du développement.

  • Les tâches exposent des meta-data via les Outputs.

  • Les outputs permettent de faire transiter des informations entre les tâches et les flux.

  • Vous pouvez manipuler les outputs et faire certaines transformations basiques avec la syntaxe de templating Peeble.

Vous savez maintenant créer un flow et décrire des tâches ! Dans le chapitre suivant nous allons voir comment exécuter et monitorer un flow.

Exemple de certificat de réussite
Exemple de certificat de réussite