Une exécution peut se déclencher de façon manuelle, via l’interface graphique. Mais le cœur de l’automatisation réside dans le déclenchement automatique de nos flux !
Dans ce chapitre, nous allons explorer les différentes manières de déclencher un flow. Nous allons voir que plusieurs possibilités nous sont offertes par Kestra, nous permettant de nous adapter selon les besoins.
Utilisez Le Trigger de type Schedule
Les Schedule Triggers sont les éléments de base du déclenchement dans Kestra. Ils permettent de programmer l'exécution d'un workflow de manière régulière en se basant sur une expression Cron ou des conditions de programmation personnalisées.
De son côté, Julien souhaite exécuter son flux tous les jours à 9h00 le matin. Certains systèmes dépendent de flux plus tard dans la journée, ainsi Julien prévoit un peu en avance l’exécution de son flux. Nous le verrons plus tard, ce déclenchement en cascade n’est pas forcément le plus optimal.
Voici le Schedule trigger qu’il utilise :
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "* 9 * * *"
En data-engineering, il est assez courant d'effectuer des backfills. Un backfill est une exécution d’un flux avec un paramètre de date passé. C’est très utile, en particulier lorsqu'une nouvelle ingestion de données est nécessaire et que les données passées doivent également être ingérées. Dans Kestra, les Schedule triggers peuvent être “backfillés” via l'interface utilisateur.
Créez des dépendances entre les Flow
L’esprit libre depuis qu’il a automatisé l’envoi de son email, Julien va pouvoir orchestrer d’autres tâches. Il a notamment besoin de déclencher un autre flux dès que l’email est envoyé. Pour cela, il va utiliser les “Flow Triggers”.
Julien va déclencher un flow dès que le Flow send_email
aura terminé son exécution avec succès :
id: flow_downstream
namespace: company.team
tasks:
- id: task_c
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }}"
triggers:
- id: listen_send_email
type: io.kestra.plugin.core.trigger.Flow
conditions:
- type: io.kestra.plugin.core.condition.ExecutionFlowCondition
namespace: company.julien
flowId: send_email
- type: io.kestra.plugin.core.condition.ExecutionStatusCondition
in:
- SUCCESS
Utilisez les Triggers “Event-Driven”
Bien que l’envoi de l’email à 9h fonctionne dans la plupart des cas, Julien fait toujours face à un problème. En effet, lorsque le dataset n'est pas disponible à 9h, l'envoi programmé génère une erreur, interrompant ainsi l’automatisation.
Grâce aux Polling Triggers, Julien va pouvoir automatiser l'envoi de l'e-mail dès la disponibilité du fichier. Les Polling triggers sont un type de trigger fourni par les plugins. Ils permettent d'interroger un système externe pour vérifier la présence de données. Dans le cas où les données sont prêtes à être traitées, une exécution de workflow est lancée. Kestra fournit des polling triggers pour une grande variété de systèmes externes, par exemple : bases de données, brokers de messages, API, FTP, … Les polling triggers interrogent le système externe à un intervalle fixe défini par la propriété interval, les sorties du polling trigger seront disponibles pour le workflow déclenché dans la variable trigger. Par exemple, Julien peut utiliser le trigger http. Trigger lui permettant de vérifier si le fichier qu’il veut traiter est bien présent avant de lancer son flux.
id: http-trigger
namespace: company.julien
tasks:
# ...
triggers:
- id: http
type: io.kestra.plugin.core.http.Trigger
interval: P1D
uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv
responseCondition: "{{ response.statusCode == 200 }}"
Ce type de trigger fonctionne avec de nombreux systèmes: déclenchement dès qu’un nouveau fichier est détecté dans un bucket S3, un FTP, etc. Mais aussi, déclenchement dès qu’un message arrive dans une queue Kakfa, dès qu’une requête sur une base de données retourne un résultat, etc.
Utilisez les Triggers de type Webhook
Jusqu’ici nous avons vu comment déclencher des flux depuis Kestra : de façon régulière avec les Schedule Trigger ou de façon événementielle avec les Flow Trigger ou les Polling Trigger. Mais comment déclencher une exécution via une application tierce ?
Les déclencheurs Webhook offrent une solution simple. Ils génèrent une URL unique qui, lorsqu'elle est appelée, démarre automatiquement votre flux. Dans le cas de Julien, cela pourrait lui permettre de déclencher son flux depuis une autre application. Afin d'utiliser l’url fournis par le webhook, vous devez ajouter une clé secrète qui sécurisera votre URL webhook :
id: webhook_example
namespace: company.team
tasks:
- id: out
type: io.kestra.plugin.core.debug.Return
format: "{{ trigger | json }}"
triggers:
- id: webhook_trigger
type: io.kestra.plugin.core.trigger.Webhook
# the required key to start this flow - might be passed as a secret
key: 1KERKzRQZSMtLdMdNI7Nkr
Le format de l'URL Webhook est le suivant :
https://{hostname}/api/v1/executions/webhook/{namespace}/{flow_id}/{key}
Dans le cas de Julien, avec un namespace company.Julien
et un flow webhook_flow
, ce serait :
hostname
est le domaine ou l'IP du serveur Kestra ;namespace
estcompany.Julien
;flow_id
estwebhook_flow
;key est
1KERKzRQZSMtLdMdNI7Nkr
(remplacez par une clé secrète).
Avec ces informations, vous pouvez tester votre workflow en exécutant la commande suivante dans le terminal pour déclencher le workflow : curl http://localhost:8080/api/v1/executions/webhook/company.team/webhook_example/1KERKzRQZSMtLdMdNI7Nkr
À vous de jouer !
Vous allez configurer le Flow que vous avez créé pour qu’il se déclenche automatiquement toutes les 2 minutes grâce à un Trigger de type Schedule. En ajoutant un Trigger, vous rendrez votre Flow autonome. Il s’exécutera automatiquement à intervalle régulier, sans nécessiter d’intervention.
Étapes :
Ajouter un Trigger au Flow précédent :
Accédez à la configuration de votre Flow.
Ajoutez un Trigger de type
Schedule
pour définir la fréquence d’exécution.
Configurer la fréquence :
Définissez un intervalle de 2 minutes pour exécuter automatiquement le Flow.
Tester l’automatisation :
Lancez le Flow avec le Trigger configuré et observez les exécutions planifiées dans les vues d’exécution.
Le code ci-dessous présente la structure de base à compléter :
id: exercise1_1
namespace: open_class_room.kestra
description: In this exercise we add a schedule trigger to run the flow every 2 minutes.
# 1. Create an input allowing to pass a URL that will be processed by the flow tasks
inputs:
tasks:
# 2. Download data
# Use the ‘{{ inputs. }}' syntax to pass the URL to the `http.Download task`
- id: download
type:
uri:
# 3. Check if the http status code is successful or not (code 200)
# Using the `io.kestra.plugin.core.flow.If` task, check if the previous request ended successfully with a code == 200.
# Log two different messages upon condition.
# Tips:
# - Each task expose outputs, you can access these ouptputs using Pebble syntax: '{{ outputs.task_name.exposed_value }}'
# - Focus on a task and use the "Source & documentation" view to see the different examples and outputs exposed by a task.
- id: check_code
type:
condition:
then:
- id: log_success
type:
else:
- id: log_error
type:
# 4. Add a schedule trigger running the flow every 2 minutes
# Tips:
# - You can create cron expression with https://crontab.guru/
triggers:
- id: schedule
type:
cron:
inputs:
data: https://raw.githubusercontent.com/kestra-io/datasets/main/csv/salaries.csv
Pour rendre votre workflow encore plus robuste, ajoutez une condition supplémentaire : Le Flow ne démarrera que si l’exécution du Flow précédent a un statut SUCCESS.
Le code ci-dessous présente la structure de base à compléter :
id: exercise1_2
namespace: open_class_room.kestra
description: In this exercise we trigger the flow based on the state of an upstream flow.
labels:
certification: fundamentals
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: "Flow has been trigger - {{ taskrun.id}}"
# 1. Add conditions to trigger the flow only when the flow exercise1_1 in namespace open_class_room.kestra is in state SUCCESS
# Tips:
# - use the `io.kestra.plugin.core.condition.ExecutionFlowCondition` to describe the upstream flow condition
# - use the `io.kestra.plugin.core.condition.ExecutionStatusCondition` to check the execution state of upstream flow
# - check blueprints for inspiration
triggers:
- id: listen-flow
type: io.kestra.plugin.core.trigger.Flow
conditions:
- type:
- type:
En résumé
Les triggers permettent de déclencher des flows.
Il existe trois grandes familles de trigger : les schedule, les event-trigger et les webhook.
Il est possible de créer des dépendances entre les flows.
Vous savez maintenant utiliser Kestra de façon basique ! Dans la prochaine partie nous allons explorer l’utilisation de Kestra dans le cadre de projets plus avancés. Mais avant cela, testez vos connaissances avec le quiz !