Aller au contenu

Exécuter un DAG sur des dates historiques#

Airflow propose plusieurs moyens d'exécuter des jobs, à partir de l'interface graphique ou en ligne de commande.

Exécuter un DAG#

Avec le bouton à droite, vous pouvez déclenchez l'excécution

Réexécuter des Tasks#

Vous pouvez sélectionner une tâche et la réexécutier. Vous avez la possibilté d'exécuter les suivantes ou cette tâche seule.

Réexécuter des DAGs#

Si vous souhaitez rattraper des historiques de données, vous pouvez le faire depuis la ligne de commande. Pour accéder au conteneur, nous tapons la commande suivante :

docker exec -it apache-airflow-airflow-webserver-1 bash

Une fois à l'intérieur du conteneur, vous tapez la commande suivante

airflow dags backfill --start-date 2024-09-01 --end-date 2024-09-23 <your_dag_id>

Output
airflow@6951a05100d8:/opt/airflow$ airflow dags backfill --start-date 2024-09-02 --end-date 2024-09-23 your_dag_id 
/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/dag_command.py:133 RemovedInAirflow3Warning: --ignore-first-depends-on-past is deprecated as the value is always set to True
[2024-09-25T23:17:23.931+0000] {dagbag.py:538} INFO - Filling up the DagBag from /opt/airflow/dags
[2024-09-25T23:17:24.862+0000] {executor_loader.py:115} INFO - Loaded executor: CeleryExecutor
[2024-09-25T23:17:25.234+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'your_dag_id', 'start', 'backfill__2024-09-02T11:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/your_dag_id.py']
[2024-09-25T23:17:30.189+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 9 | succeeded: 0 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 9
[2024-09-25T23:17:34.983+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 9 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 9
[2024-09-25T23:17:35.003+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'your_dag_id', 'rech_1d_1', 'backfill__2024-09-02T11:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/your_dag_id.py']
[2024-09-25T23:17:35.036+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'your_dag_id', 'rech_90d_1', 'backfill__2024-09-02T11:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/your_dag_id.py']
[2024-09-25T23:17:40.130+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 7 | succeeded: 1 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 7
[2024-09-25T23:17:44.993+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 7 | succeeded: 1 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 7
[2024-09-25T23:17:50.005+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 7 | succeeded: 2 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 7
[2024-09-25T23:17:50.027+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'your_dag_id', 'rech_7d_1', 'backfill__2024-09-02T11:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/your_dag_id.py']
[2024-09-25T23:17:55.046+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 2 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6
[2024-09-25T23:18:00.028+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 2 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6
[2024-09-25T23:18:05.028+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 2 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6
[2024-09-25T23:18:10.051+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 2 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6
[2024-09-25T23:18:15.054+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 2 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6
[2024-09-25T23:18:20.051+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 2 | running: 2 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6
[2024-09-25T23:18:25.069+0000] {backfill_job_runner.py:446} INFO - [backfill progress] | finished run 0 of 21 | tasks waiting: 6 | succeeded: 3 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 6
[2024-09-25T23:18:25.091+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'your_dag_id', 'rech_30d_1', 'backfill__2024-09-02T11:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/your_dag_id.py']

Avec l' --reset-dagruns:

Exécuter des tâches#

Il est également possible d'exécuter une tâche de façon unitaire.

airflow tasks test <dag-id> <task-id> yyyy-mm-dd

Output
[2025-06-26T09:33:18.492+0000] {manager.py:147} INFO - DAG bundles loaded: dags-folder
[2025-06-26T09:33:18.958+0000] {dagbag.py:573} INFO - Filling up the DagBag from /opt/airflow/dags
[2025-06-26T09:33:21.618+0000] {taskinstance.py:1311} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: etl_ume_datamart.bulk_insert_sellout_day __airflow_temporary_run_2025-06-26T09:33:21.506585+00:00__ [None]>
[2025-06-26T09:33:21.627+0000] {taskinstance.py:1311} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: etl_ume_datamart.bulk_insert_sellout_day __airflow_temporary_run_2025-06-26T09:33:21.506585+00:00__ [None]>
[2025-06-26T09:33:21.627+0000] {taskinstance.py:1548} INFO - Starting attempt 0 of 3
[2025-06-26T09:33:21.628+0000] {taskinstance.py:1627} WARNING - cannot record queued_duration for task bulk_insert_sellout_day because previous state change time has not been saved
[2025-06-26T09:33:21.630+0000] {taskinstance.py:1571} INFO - Executing <Task(PythonOperator): bulk_insert_sellout_day> on 2025-01-05 00:00:00+00:00
[2025-06-26T09:33:21.826+0000] {taskinstance.py:2044} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='Joseph.KONKA@YAS.TG' AIRFLOW_CTX_DAG_OWNER='jkonka' AIRFLOW_CTX_DAG_ID='etl_ume_datamart' AIRFLOW_CTX_TASK_ID='bulk_insert_sellout_day' AIRFLOW_CTX_LOGICAL_DATE='2025-01-05T00:00:00+00:00' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2025-06-26T09:33:21.506585+00:00__'
[2025-06-26T09:33:21.830+0000] {taskinstance.py:2158} INFO - ::endgroup::

airflow tasks test n’enregistre pas de DAG Run officiel dans l’interface, contrairement au backfill.


.