Définir des dépendances

Dans Airflow, l'ordre d'exécution est primordial. On distingue deux types de dépendances : les dépendances intra-DAG (entre tâches d'un même fichier) et les dépendances inter-DAG (entre deux pipelines distincts).

Dépendances entre tâches (Intra-DAG)#

C'est la méthode la plus courante. On utilise les opérateurs >> (amont vers aval) et << (aval vers amont).

La tâche B attend que A réussisse#

task_a >> task_b

On peut aussi chaîner plusieurs tâches#

task_a >> [task_b, task_c] >> task_d

# La tâche B attend que A réussisse
task_a >> task_b

# On peut aussi chaîner plusieurs tâches
task_a >> [task_b, task_c] >> task_d

Dépendances entre DAGs (Inter-DAG)#

Parfois, votre pipeline de "Reporting" ne doit démarrer que si le pipeline "Ingestion" est terminé. Comme ce sont deux fichiers différents, l'opérateur >> ne fonctionne pas. C'est là qu'interviennent les Sensors.

L'ExternalTaskSensor#

Le ExternalTaskSensor est un capteur qui "écoute" l'état d'une tâche dans un autre DAG.

from datetime import timedelta
from airflow.sensors.external_task import ExternalTaskSensor

check_kpis_etl = ExternalTaskSensor(
    task_id="check_previous_etl",
    external_dag_id="fecth_oltp_data", # Le DAG cible
    external_task_id="end", # La tâche spécifique à attendre
    execution_delta=timedelta(hours=0, minutes=40), # le delta entre les deux DAGs est de 40 minutes.
    mode="reschedule", # Mode 'reschedule' : libère le worker entre deux vérifications
    timeout=60, # Temps maximum d'attente avant d'échouer (en secondes)
)

Les règles de déclenchement (Trigger Rules)#

Par défaut, une tâche ne s'exécute que si toutes les tâches précédentes ont réussi (all_success). Mais vous pouvez modifier ce comportement :

  • all_done : La tâche s'exécute dès que les précédentes sont finies, peu importe le résultat (succès ou échec).
  • one_success : Dès qu'au moins une tâche parente réussit.
from airflow.models.trigger_rule import TriggerRule
from airflow.providers.standard.operators.python import PythonOperator

cleanup_task = PythonOperator(
    task_id="cleanup",
    python_callable=clean_temp_files,
    trigger_rule=TriggerRule.ALL_DONE # S'exécute même en cas d'erreur avant
)

Maîtriser les dépendances est la clé pour créer des systèmes de données résilients. Dans le module suivant, nous verrons comment gérer les variables et les connexions pour ne plus jamais écrire de mots de passe en dur dans vos codes.

Commentaires

Inscrivez-vous à une formation complète

Voir le catalogue