Aller au contenu

Planifier l'exécution d'un DAG#

La planification de l'exécution des tâches est l'une des fonctionnalités phares d'Apache Airflow. Elle permet de définir quand et comment vos DAGs (Directed Acyclic Graphs) doivent s'exécuter, rendant vos workflows automatiques et fiables.

Cron#

Le cron est un outil natif des systèmes Linux/Unix permettant de planifier l’exécution automatique de tâches (scripts, commandes, programmes) à des moments précis ou à intervalles réguliers. Ces tâches sont définies dans un fichier appelé crontab (cron table), qui spécifie à quelle minute, heure, jour du mois, mois et jour de la semaine la commande doit être lancée. Ce système est très utile pour automatiser des routines telles que des sauvegardes, des mises à jour de données, ou l'envoi de rapports.

La syntaxe d'une expression cron est composée de cinq champs obligatoires (minute, heure, jour du mois, mois, jour de la semaine), suivis de la commande à exécuter. Par exemple, 0 2 * * * /path/to/script.sh exécute le script tous les jours à 2h00 du matin. Il est également possible d’utiliser des intervalles (*/5 pour toutes les 5 minutes), des plages (1-5) ou des valeurs multiples (1,15,30) pour affiner le déclenchement. Des plateformes comme Apache Airflow reprennent cette logique pour orchestrer les workflows, en la rendant plus flexible et observable.

Airflow#

Airflow utilise synthase des crons pour gérer la planification des dags. La fréquence d'exécution d'un DAG est définie par le paramètre schedule_interval. Si vous avez déjà utilisé un cron sur Linux, ce concept vous sera familier. Sinon, pas de panique, nous allons tout expliquer.

Airflow DAG
dga = DAG(
    dag_id="etl_sales",
    default_args=default_args,
    description="Sales ETL",
    schedule_interval="0 6 * * *",
    tags=["sales", "reporting", "daily", "critical"],
    catchup=False,
    max_active_runs=1,
)

Voici un exemple d'utilisation dans un DAG Airflow :

Airflow DAG
# Packages
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.postgres_operator import PostgresOperator

# Define default_args and DAG
default_args = {
    "owner": "joekakone",
    "start_date": datetime(2024, 12, 1), # Use today date to avoid undesirable reruns
    "retries": 1,
    "retry_delay": timedelta(minutes=15),
    'email': ["joseph.kakone@gmail.com"],
    'email_on_failure': True,
    'email_on_retry': False,
}

# Create DAG instance
with DAG(
    dag_id="etl_sales",
    default_args=default_args,
    description="Sales ETL",
    schedule_interval="0 6 * * *",
    tags=["sales", "reporting", "daily", "critical"],
    catchup=False,
    max_active_runs=1,
) as dag:

    # Tasks
    start_task = DummyOperator(task_id="start")

    sql_task = PostgresOperator(
        task_id="sql_task",
        postgres_conn_id="postgres",
        sql=f"select edw.fn_rpt_performance_product('{RUN_DATE}'::date)",
        dag=dag,
    )

    end_task = DummyOperator(task_id="end")

    # Flow
    start_task >> sql_task >>  end_task

La syntaxe du schedule_interval est basée sur le format cron, qui permet de spécifier des exécutions avec précision. Voici la signification des champs :

Champ Valeur Signification
Minute 0-59 Minute exacte
Heure 0-23 Heure exacte
Jour du mois 1-31 Jour du mois
Mois 1-12 Mois
Jour de la semaine 0-6 Dimanche (0) à Samedi (6)

Exemple pour 0 6 * * * :

  • Minute : 0 (pile à l'heure)
  • Heure : 6 (6 heures du matin)
  • Tous les jours du mois
  • Tous les mois
  • Tous les jours de la semaine

Voici un guide visuel pour vous aider à mieux comprendre :

Pour simplifier la création de ces expressions, vous pouvez visiter Crontab Guru, un outil en ligne qui vous aide à définir et tester vos expressions cron.

Pour finir voici quelques exemples qui répondent à d'autres besoins spécifiques : Valeur | Signification -|- @hourly | Chaque heure @daily | Chaque jour 0 */4 * * * | Chaque quatre heure */15 * * * * | Chaque 15 minutes


Grâce à schedule_interval, vous pouvez automatiser l'exécution de vos workflows selon les besoins de votre projet. Dans la prochaine section, nous verrons comment gérer les dépendances entre les tâches pour structurer vos DAGs de manière optimale..