Créer un pipeline de données

Pour notre premier projet, nous allons créer un pipeline (DAG Airflow) qui récupère les données du covid 19 depuis un dépôt GitHub et les charge dans une base de données Postgres.

Pour ce faire, dans le dossier dags/ nous allons créer un fichier etl_covid19.py. Nous aurons également besoin du fichier utils/first_pipeline.py qui contient les fonctions métier extract_data, transform_data et load_data.

Importation des Packages#

Tout pipeline commence par l'importation des briques élémentaires. Ici, nous utilisons des Operators, qui sont des modèles de tâches préconfigurés.

from datetime import datetime

from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator as PostgresOperator
from airflow.providers.standard.operators.python import PythonOperator

from utils.first_pipeline import extract_data, transform_data, load_data

Définition du DAG#

Le DAG (Directed Acyclic Graph) est le conteneur de votre pipeline. C'est ici qu'on définit "quand" et "comment" le pipeline s'exécute.

# Default Args
default_args = {
    "owner": "algojungle,
    "start_date": datetime(2026, 3, 1, 6, 0), # Date de début du pipeline
    "retries": 2,
    "retry_delay": timedelta(minutes=15),
}

# DAG
dag = DAG(
    dag_id="first_pipeline", # Nom unique affiché dans l'interface Airflow
    description="First Apache Airflow Pipeline",
    default_args=default_args,
    schedule_interval="30 6 * * *", # Planification (Cron) : Tous les jours à 06h30
    tags=["demo", "etl", "covid19"], # Pratique pour filtrer vos DAGs par projet
    catchup=False # Évite de lancer tous les runs passés depuis start_date
)

Les Composants (Tasks)#

Start Pipeline
On commence souvent par un EmptyOperator. Il sert de point d'ancrage visuel dans l'interface pour marquer le début du flux.

start = EmptyOperator(task_id="start")

Setup table
Avant de charger des données, il faut s'assurer que la table de destination existe. Le PostgresOperator s'occupe de communiquer avec la base de données.

sql = """
create table if not exists covid19_daily_kpi (
    dt date,
    country varchar(200) not null,
    latitude numeric,
    longitude numeric,
    confirmed integer,
    deaths integer,
    recovered integer,
    active integer,
    load_datetime timestamp not null default now()
);
"""
create_table = PostgresOperator(task_id="create_table", postgres_conn_id="postgres", sql=sql)

Extract, Transform & Load
Pour ces étapes, nous utilisons le PythonOperator. Il fait le pont entre Airflow et vos scripts Python existants.

# Extract : Récupération des données brutes
extract = PythonOperator(task_id="extract", python_callable=extract_data)

# Transform : Nettoyage et préparation des données
transform = PythonOperator(task_id="transform", python_callable=transform_data)  

# Load : Chargement final dans Postgres
load = PythonOperator(task_id="load", python_callable=load_data)

End Pipeline
Comme pour le début, on ferme le flux proprement.

end = EmptyOperator(task_id="end")

Orchestraction (Le Flow)#

C'est ici que la magie opère. On définit l'ordre d'exécution à l'aide des opérateurs de bits >>. Airflow s'assurera qu'une étape ne commence que si la précédente a réussi.

start >> setup >> extract >> transform >> load >> end

Code complet#

Voici à quoi ressemble votre fichier final. Notez l'utilisation du bloc with DAG(...) : c'est la manière la plus propre et la plus moderne d'écrire vos pipelines.

First Apache Airflow DAG
"""
    ETL Covid-19 : Mon premier DAG Airflow
    Auteur : AlgoJungle
"""

from datetime import datetime

from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator as PostgresOperator
from airflow.providers.standard.operators.python import PythonOperator

from utils.first_pipeline import extract_data, transform_data, load_data


# Default Args
default_args = {
    "owner": "algojungle,
    "start_date": datetime(2026, 3, 1, 6, 0), # Date de début du pipeline
    "retries": 2,
    "retry_delay": timedelta(minutes=15),
}

# DAG
with DAG(
    dag_id="first_pipeline", # Nom unique affiché dans l'interface Airflow
    description="First Apache Airflow Pipeline",
    default_args=default_args,
    schedule_interval="30 6 * * *", # Planification (Cron) : Tous les jours à 06h30
    tags=["demo", "etl", "covid19"], # Pratique pour filtrer vos DAGs par projet
    catchup=False # Évite de lancer tous les runs passés depuis start_date
) as dag:

    #########################################################################################
    # Tasks
    #########################################################################################
    start = EmptyOperator(task_id="start")

    sql = """
    create table if not exists covid19_daily_kpi (
        dt date,
        country varchar(200) not null,
        latitude numeric,
        longitude numeric,
        confirmed integer,
        deaths integer,
        recovered integer,
        active integer,
        load_datetime timestamp not null default now()
    );
    """
    create_table = PostgresOperator(task_id="create_table", postgres_conn_id="postgres", sql=sql)

    extract = PythonOperator(task_id="extract", python_callable=extract_data)

    transform = PythonOperator(task_id="transform", python_callable=transform_data)  

    load = PythonOperator(task_id="load", python_callable=load_data)

    end = EmptyOperator(task_id="end")
    #########################################################################################


    #########################################################################################
    # Flow
    #########################################################################################
    start >> create_table >> extract >> transform >> load >> end
    #########################################################################################


Bravo, vous avez créé votre premier pipeline avec Airflow. Dans la prochaine section, nous allons aborder en détail la planification des DAGs et comprendre comment gérer les échecs.

Commentaires

Inscrivez-vous à une formation complète

Voir le catalogue