• 6 heures
  • Moyenne

Ce cours est visible gratuitement en ligne.

course.header.alt.is_certifying

J'ai tout compris !

Mis à jour le 03/12/2024

Déclenchez des Flows avec les Triggers

Une exécution peut se déclencher de façon manuelle, via l’interface graphique. Mais le cœur de l’automatisation réside dans le déclenchement automatique de nos flux !

Dans ce chapitre, nous allons explorer les différentes manières de déclencher un flow. Nous allons voir que plusieurs possibilités nous sont offertes par Kestra, nous permettant de nous adapter selon les besoins.

Utilisez Le Trigger de type Schedule

Les Schedule Triggers sont les éléments de base du déclenchement dans Kestra. Ils permettent de programmer l'exécution d'un workflow de manière régulière en se basant sur une expression Cron ou des conditions de programmation personnalisées.

De son côté, Julien souhaite exécuter son flux tous les jours à 9h00 le matin. Certains systèmes dépendent de flux plus tard dans la journée, ainsi Julien prévoit un peu en avance l’exécution de son flux. Nous le verrons plus tard, ce déclenchement en cascade n’est pas forcément le plus optimal.

Voici le Schedule trigger qu’il utilise :

triggers:
  - id: schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "* 9 * * *"

En data-engineering, il est assez courant d'effectuer des backfills. Un backfill est une exécution d’un flux avec un paramètre de date passé. C’est très utile, en particulier lorsqu'une nouvelle ingestion de données est nécessaire et que les données passées doivent également être ingérées. Dans Kestra, les Schedule triggers peuvent être “backfillés” via l'interface utilisateur.

Capture d'écran de l'onglet Triggers dans Kestra. Pour le Schedule trigger listé, la colonne Backfill indique « Backfill executions ».
La colonne "Backfill" dans la vue Triggers.

Créez des dépendances entre les Flow

L’esprit libre depuis qu’il a automatisé l’envoi de son email, Julien va pouvoir orchestrer d’autres tâches. Il a notamment besoin de déclencher un autre flux dès que l’email est envoyé. Pour cela, il va utiliser les “Flow Triggers”.

Julien va déclencher un flow dès que le Flow send_email aura terminé son exécution avec succès :

id: flow_downstream
namespace: company.team

tasks:
  - id: task_c
    type: io.kestra.plugin.core.debug.Return
    format: "{{ task.id }}"

triggers:

  - id: listen_send_email
    type: io.kestra.plugin.core.trigger.Flow
    conditions:
      - type: io.kestra.plugin.core.condition.ExecutionFlowCondition
        namespace: company.julien
        flowId: send_email
      - type: io.kestra.plugin.core.condition.ExecutionStatusCondition
        in:
          - SUCCESS

Utilisez les Triggers “Event-Driven”

Bien que l’envoi de l’email à 9h fonctionne dans la plupart des cas, Julien fait toujours face à un problème. En effet, lorsque le dataset n'est pas disponible à 9h, l'envoi programmé génère une erreur, interrompant ainsi l’automatisation.

Grâce aux Polling Triggers, Julien va pouvoir automatiser l'envoi de l'e-mail dès la disponibilité du fichier. Les Polling triggers sont un type de trigger fourni par les plugins. Ils permettent d'interroger un système externe pour vérifier la présence de données. Dans le cas où les données sont prêtes à être traitées, une exécution de workflow est lancée. Kestra fournit des polling triggers pour une grande variété de systèmes externes, par exemple : bases de données, brokers de messages, API, FTP, … Les polling triggers interrogent le système externe à un intervalle fixe défini par la propriété interval, les sorties du polling trigger seront disponibles pour le workflow déclenché dans la variable trigger. Par exemple, Julien peut utiliser le trigger http. Trigger lui permettant de vérifier si le fichier qu’il veut traiter est bien présent avant de lancer son flux.

id: http-trigger
namespace: company.julien

tasks:
  # ...

triggers:
  - id: http
    type: io.kestra.plugin.core.http.Trigger
    interval: P1D   
    uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv
    responseCondition: "{{ response.statusCode == 200 }}"

Ce type de trigger fonctionne avec de nombreux systèmes: déclenchement dès qu’un nouveau fichier est détecté dans un bucket S3, un FTP, etc. Mais aussi, déclenchement dès qu’un message arrive dans une queue Kakfa, dès qu’une requête sur une base de données retourne un résultat, etc.

Utilisez les Triggers de type Webhook

Jusqu’ici nous avons vu comment déclencher des flux depuis Kestra : de façon régulière avec les Schedule Trigger ou de façon événementielle avec les Flow Trigger ou les Polling Trigger. Mais comment déclencher une exécution via une application tierce ?

Les déclencheurs Webhook offrent une solution simple. Ils génèrent une URL unique qui, lorsqu'elle est appelée, démarre automatiquement votre flux. Dans le cas de Julien, cela pourrait lui permettre de déclencher son flux depuis une autre application. Afin d'utiliser l’url fournis par le webhook, vous devez ajouter une clé secrète qui sécurisera votre URL webhook :

id: webhook_example
namespace: company.team

tasks:
  - id: out
    type: io.kestra.plugin.core.debug.Return
    format: "{{ trigger | json }}"


triggers:
  - id: webhook_trigger
    type: io.kestra.plugin.core.trigger.Webhook
    # the required key to start this flow - might be passed as a secret
    key: 1KERKzRQZSMtLdMdNI7Nkr

Le format de l'URL Webhook est le suivant :

https://{hostname}/api/v1/executions/webhook/{namespace}/{flow_id}/{key}

Dans le cas de Julien, avec un namespace company.Julien et un flow webhook_flow, ce serait :

  • hostname est le domaine ou l'IP du serveur Kestra ;

  • namespace est company.Julien ;

  • flow_id est webhook_flow ;

  • key est 1KERKzRQZSMtLdMdNI7Nkr (remplacez par une clé secrète).

Avec ces informations, vous pouvez tester votre workflow en exécutant la commande suivante dans le terminal pour déclencher le workflow : curl http://localhost:8080/api/v1/executions/webhook/company.team/webhook_example/1KERKzRQZSMtLdMdNI7Nkr

À vous de jouer !

Vous allez configurer le Flow que vous avez créé pour qu’il se déclenche automatiquement toutes les 2 minutes grâce à un Trigger de type Schedule. En ajoutant un Trigger, vous rendrez votre Flow autonome. Il s’exécutera automatiquement à intervalle régulier, sans nécessiter d’intervention. 

Étapes :  
  1. Ajouter un Trigger au Flow précédent :

    • Accédez à la configuration de votre Flow.

    • Ajoutez un Trigger de typeSchedulepour définir la fréquence d’exécution.

  2. Configurer la fréquence :

    • Définissez un intervalle de 2 minutes pour exécuter automatiquement le Flow.

  3. Tester l’automatisation :

    • Lancez le Flow avec le Trigger configuré et observez les exécutions planifiées dans les vues d’exécution.

Le code ci-dessous présente la structure de base à compléter :

id: exercise1_1
namespace: open_class_room.kestra
description: In this exercise we add a schedule trigger to run the flow every 2 minutes.


# 1. Create an input allowing to pass a URL that will be processed by the flow tasks
inputs:

tasks:
  # 2. Download data
  # Use the ‘{{ inputs. }}' syntax to pass the URL to the `http.Download task`
  - id: download
    type: 
    uri: 

  # 3. Check if the http status code is successful or not (code 200)
  # Using the `io.kestra.plugin.core.flow.If` task, check if the previous request ended successfully with a code == 200.
  # Log two different messages upon condition.
  # Tips: 
  # - Each task expose outputs, you can access these ouptputs using Pebble syntax: '{{ outputs.task_name.exposed_value }}'
  # - Focus on a task and use the "Source & documentation" view to see the different examples and outputs exposed by a task.
  - id: check_code
    type: 
    condition: 
    then:
      - id: log_success
        type: 

    else:
      - id: log_error
        type: 

# 4. Add a schedule trigger running the flow every 2 minutes
# Tips:
# - You can create cron expression with https://crontab.guru/
triggers:
  - id: schedule
    type: 
    cron: 
    inputs:
      data: https://raw.githubusercontent.com/kestra-io/datasets/main/csv/salaries.csv

Pour rendre votre workflow encore plus robuste, ajoutez une condition supplémentaire : Le Flow ne démarrera que si l’exécution du Flow précédent a un statut SUCCESS.

Le code ci-dessous présente la structure de base à compléter :

id: exercise1_2
namespace: open_class_room.kestra
description: In this exercise we trigger the flow based on the state of an upstream flow.

labels:
  certification: fundamentals

tasks:

  - id: log
    type: io.kestra.plugin.core.log.Log
    message: "Flow has been trigger - {{ taskrun.id}}"

# 1. Add conditions to trigger the flow only when the flow exercise1_1 in namespace open_class_room.kestra is in state SUCCESS
# Tips:
# - use the `io.kestra.plugin.core.condition.ExecutionFlowCondition` to describe the upstream flow condition
# - use the `io.kestra.plugin.core.condition.ExecutionStatusCondition` to check the execution state of upstream flow
# - check blueprints for inspiration
triggers:
  - id: listen-flow
    type: io.kestra.plugin.core.trigger.Flow
    conditions:
      - type: 
      - type:

En résumé

  • Les triggers permettent de déclencher des flows.

  • Il existe trois grandes familles de trigger : les schedule, les event-trigger et les webhook.

  • Il est possible de créer des dépendances entre les flows.

Vous savez maintenant utiliser Kestra de façon basique ! Dans la prochaine partie nous allons explorer l’utilisation de Kestra dans le cadre de projets plus avancés. Mais avant cela, testez vos connaissances avec le quiz !

Exemple de certificat de réussite
Exemple de certificat de réussite