Airflow propose plusieurs moyens d'exécuter des jobs, à partir de l'interface graphique ou en ligne de commande.
Avec le bouton à droite, vous pouvez déclenchez l'excécution
Vous pouvez sélectionner une tâche et la réexécutier. Vous avez la possibilté d'exécuter les suivantes ou cette tâche seule.
Si vous souhaitez rattraper des historiques de données, vous pouvez le faire depuis la ligne de commande. Pour accéder au conteneur, nous tapons la commande suivante :
docker exec -it apache-airflow-airflow-webserver-1 bash
Une fois à l'intérieur du conteneur, vous tapez la commande suivante
airflow dags backfill --start-date 2024-09-01 --end-date 2024-09-23 <your_dag_id>
airflow@6951a05100d8:/opt/airflow$ airflow dags backfill --start-date 2024-09-02 --end-date 2024-09-23 your_dag_id /home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/dag_command.py:133 RemovedInAirflow3Warning: --ignore-first-depends-on-past is deprecated as the value is always set to True [2024-09-25T23:17:23.931+0000] {dagbag.py:538} INFO - Filling up the DagBag from /opt/airflow/dags [2024-09-25T23:17:24.862+0000] {executor_loader.py:115} INFO - Loaded executor: CeleryExecutor [2024-09-25T23:17:25.234+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'your_dag_id', 'start', 'backfill__2024-09-02T11:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/your_dag_id.py'] [2024-09-25T23:17:30.189+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 9 | succeeded: 0 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 9 [2024-09-25T23:17:34.983+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 9 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 9 [2024-09-25T23:17:35.003+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'your_dag_id', 'rech_1d_1', 'backfill__2024-09-02T11:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/your_dag_id.py'] [2024-09-25T23:17:35.036+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'your_dag_id', 'rech_90d_1', 'backfill__2024-09-02T11:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/your_dag_id.py'] [2024-09-25T23:17:40.130+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 7 | succeeded: 1 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 7 [2024-09-25T23:17:44.993+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 7 | succeeded: 1 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 7 [2024-09-25T23:17:50.005+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 7 | succeeded: 2 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 7 [2024-09-25T23:17:50.027+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'your_dag_id', 'rech_7d_1', 'backfill__2024-09-02T11:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/your_dag_id.py'] [2024-09-25T23:17:55.046+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 2 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6 [2024-09-25T23:18:00.028+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 2 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6 [2024-09-25T23:18:05.028+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 2 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6 [2024-09-25T23:18:10.051+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 2 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6 [2024-09-25T23:18:15.054+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 2 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6 [2024-09-25T23:18:20.051+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 2 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6 [2024-09-25T23:18:25.069+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 3 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6 [2024-09-25T23:18:25.091+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'your_dag_id', 'rech_30d_1', 'backfill__2024-09-02T11:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/your_dag_id.py']
Avec l' --reset-dagruns:
--reset-dagruns
Il est également possible d'exécuter une tâche de façon unitaire.
airflow tasks test <dag-id> <task-id> yyyy-mm-dd
[2025-06-26T09:33:18.492+0000] {manager.py:147} INFO - DAG bundles loaded: dags-folder [2025-06-26T09:33:18.958+0000] {dagbag.py:573} INFO - Filling up the DagBag from /opt/airflow/dags [2025-06-26T09:33:21.618+0000] {taskinstance.py:1311} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: etl_ume_datamart.bulk_insert_sellout_day __airflow_temporary_run_2025-06-26T09:33:21.506585+00:00__ [None]> [2025-06-26T09:33:21.627+0000] {taskinstance.py:1311} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: etl_ume_datamart.bulk_insert_sellout_day __airflow_temporary_run_2025-06-26T09:33:21.506585+00:00__ [None]> [2025-06-26T09:33:21.627+0000] {taskinstance.py:1548} INFO - Starting attempt 0 of 3 [2025-06-26T09:33:21.628+0000] {taskinstance.py:1627} WARNING - cannot record queued_duration for task bulk_insert_sellout_day because previous state change time has not been saved [2025-06-26T09:33:21.630+0000] {taskinstance.py:1571} INFO - Executing <Task(PythonOperator): bulk_insert_sellout_day> on 2025-01-05 00:00:00+00:00 [2025-06-26T09:33:21.826+0000] {taskinstance.py:2044} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='Joseph.KONKA@YAS.TG' AIRFLOW_CTX_DAG_OWNER='jkonka' AIRFLOW_CTX_DAG_ID='etl_ume_datamart' AIRFLOW_CTX_TASK_ID='bulk_insert_sellout_day' AIRFLOW_CTX_LOGICAL_DATE='2025-01-05T00:00:00+00:00' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2025-06-26T09:33:21.506585+00:00__' [2025-06-26T09:33:21.830+0000] {taskinstance.py:2158} INFO - ::endgroup::
airflow tasks test n’enregistre pas de DAG Run officiel dans l’interface, contrairement au backfill.
from datetime import date, timedelta import os dag_id = "etl_ume_datamart" task_id = "bulk_insert_sellout_day" start = date(2025, 1, 2) end = date(2025, 6, 26) delta = timedelta(days=1) d = start while d <= end: cmd = f"""airflow tasks test {dag_id} {task_id} {d}""" print(f"Running: {cmd}") os.system(cmd) d += delta
.