Depuis l'interface graphique (UI)#
C'est le moyen le plus simple et le plus visuel pour intervenir sur vos exécutions.
Déclencher manuellement#
Dans l'onglet DAGs, vous trouverez un bouton "Play" à droite.
- Trigger DAG : Lance le DAG avec la date actuelle.
- Trigger DAG w/ config : Vous permet de passer des paramètres JSON ou de choisir une
logical datespécifique pour simuler un jour précis.
Réexécuter des tâches (Clear)#
Si une tâche échoue, vous n'avez pas besoin de relancer tout le pipeline.
- Cliquez sur le carré de la tâche échouée (souvent rouge).
- Cliquez sur le bouton Clear.
- Airflow va "effacer" l'état de la tâche, ce qui la remettra en file d'attente pour exécution. Vous pouvez choisir d'inclure les tâches en amont (upstream) ou en aval (downstream).
Depuis l'invite de commandes (CLI)#
La ligne de commande est indispensable pour les opérations de masse, comme le rattrapage d'un historique complet sur plusieurs mois.
Accéder au conteneur#
Si vous utilisez Docker, vous devez d'abord "entrer" dans le conteneur du serveur web ou du scheduler :
Le Backfill : Rattraper l'historique#
La commande backfill permet de lancer un DAG sur une plage de dates. Airflow créera automatiquement toutes les instances nécessaires entre la date de début et de fin.
airflow@6951a05100d8:/opt/airflow$ airflow dags backfill --start-date 2024-09-02 --end-date 2024-09-23 dag_id
/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/dag_command.py:133 RemovedInAirflow3Warning: --ignore-first-depends-on-past is deprecated as the value is always set to True
[2024-09-25T23:17:23.931+0000] {dagbag.py:538} INFO - Filling up the DagBag from /opt/airflow/dags
[2024-09-25T23:17:24.862+0000] {executor_loader.py:115} INFO - Loaded executor: CeleryExecutor
[2024-09-25T23:17:25.234+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'dag_id', 'start', 'backfill__2024-09-02T11:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/dag_id.py']
[2024-09-25T23:17:30.189+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 9 | succeeded: 0 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 9
[2024-09-25T23:17:34.983+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 9 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 9
[2024-09-25T23:17:35.003+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'dag_id', 'rech_1d_1', 'backfill__2024-09-02T11:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/dag_id.py']
[2024-09-25T23:17:35.036+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'dag_id', 'rech_90d_1', 'backfill__2024-09-02T11:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/dag_id.py']
[2024-09-25T23:17:40.130+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 7 | succeeded: 1 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 7
[2024-09-25T23:17:44.993+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 7 | succeeded: 1 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 7
[2024-09-25T23:17:50.005+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 7 | succeeded: 2 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 7
[2024-09-25T23:17:50.027+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'dag_id', 'rech_7d_1', 'backfill__2024-09-02T11:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/dag_id.py']
[2024-09-25T23:17:55.046+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 2 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6
[2024-09-25T23:18:00.028+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 2 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6
[2024-09-25T23:18:05.028+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 2 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6
[2024-09-25T23:18:10.051+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 2 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6
[2024-09-25T23:18:15.054+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 2 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6
[2024-09-25T23:18:20.051+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 2 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6
[2024-09-25T23:18:25.069+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 3 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6
[2024-09-25T23:18:25.091+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'dag_id', 'rech_30d_1', 'backfill__2024-09-02T11:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/dag_id.py']
Option utile : * --reset-dagruns : Supprime les exécutions existantes pour ces dates avant de les relancer.
Tester une tâche (Airflow Tasks Test)#
C'est l'outil préféré des développeurs. La commande test exécute une tâche localement sans enregistrer d'état dans la base de données et sans vérifier les dépendances. C'est idéal pour déboguer le code Python.
[2025-06-26T09:33:18.492+0000] {manager.py:147} INFO - DAG bundles loaded: dags-folder
[2025-06-26T09:33:18.958+0000] {dagbag.py:573} INFO - Filling up the DagBag from /opt/airflow/dags
[2025-06-26T09:33:21.618+0000] {taskinstance.py:1311} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: etl_ume_datamart.bulk_insert_sellout_day __airflow_temporary_run_2025-06-26T09:33:21.506585+00:00__ [None]>
[2025-06-26T09:33:21.627+0000] {taskinstance.py:1311} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: etl_ume_datamart.bulk_insert_sellout_day __airflow_temporary_run_2025-06-26T09:33:21.506585+00:00__ [None]>
[2025-06-26T09:33:21.627+0000] {taskinstance.py:1548} INFO - Starting attempt 0 of 3
[2025-06-26T09:33:21.628+0000] {taskinstance.py:1627} WARNING - cannot record queued_duration for task bulk_insert_sellout_day because previous state change time has not been saved
[2025-06-26T09:33:21.630+0000] {taskinstance.py:1571} INFO - Executing <Task(PythonOperator): bulk_insert_sellout_day> on 2025-01-05 00:00:00+00:00
[2025-06-26T09:33:21.826+0000] {taskinstance.py:2044} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='Joseph.KONKA@YAS.TG' AIRFLOW_CTX_DAG_OWNER='jkonka' AIRFLOW_CTX_DAG_ID='etl_ume_datamart' AIRFLOW_CTX_TASK_ID='bulk_insert_sellout_day' AIRFLOW_CTX_LOGICAL_DATE='2025-01-05T00:00:00+00:00' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2025-06-26T09:33:21.506585+00:00__'
[2025-06-26T09:33:21.830+0000] {taskinstance.py:2158} INFO - ::endgroup::
airflow tasks test n’enregistre pas de DAG Run officiel dans l’interface, contrairement au backfill.
Automatiser les tests avec Python#
Parfois, vous voulez tester une tâche sur une longue période sans utiliser le mécanisme de backfill (pour éviter de polluer l'historique de production). Vous pouvez utiliser un petit script Python pour boucler sur les dates :
from datetime import date, timedelta
import os
dag_id = "etl_ume_datamart"
task_id = "bulk_insert_sellout_day"
start = date(2025, 1, 2)
end = date(2025, 1, 10)
delta = timedelta(days=1)
d = start
while d <= end:
# On construit la commande CLI
cmd = f"airflow tasks test {dag_id} {task_id} {d}"
print(f"Exécution pour la date : {d}")
# Exécution de la commande système
os.system(cmd)
d += delta
Félicitations ! Vous savez maintenant comment piloter vos pipelines à la main. Dans la prochaine section, nous verrons comment surveiller la santé de vos jobs et configurer des alertes en cas d'erreur.
Commentaires