Pour notre premier projet, nous allons créer un pipeline qui récupère des données depuis GitHub et les charge dans une base de données Postgres. Packages# from datetime import datetime from airflow import DAG from airflow.operators.empty import EmptyOperator from airflow.operators.postgres_operator import PostgresOperator from airflow.operators.python import PythonOperator from utils.first_pipeline import extract_data, transform_data, load_data DAG# dag = DAG( dag_id="first_pipeline", description="First Apache Airflow Pipeline", start_date=datetime(year=2023, month=6, day=18, hour=6, minute=0), schedule_interval="30 6 * * *", # Everyday at 6:00 AM tags=["demo"] ) Composants# Start Pipeline First Pipelinestart_pipeline = EmptyOperator( task_id="start_pipeline" ) Setup table create_table = PostgresOperator( task_id='create_table', postgres_conn_id='postgres', sql=["""create table if not exists covid19_daily_kpi ( dt date, country varchar(200) not null, latitude numeric, longitude numeric, confirmed integer, deaths integer, recovered integer, active integer, load_datetime timestamp not null default now() )"""] ) Extract data extract = PythonOperator( task_id = 'extract_data', python_callable = extract_data ) Transform data transform = PythonOperator( task_id = 'transform_data', python_callable = transform_data ) Load data load = PythonOperator( task_id = 'load_data', python_callable = load_data ) End Pipeline end_pipeline = EmptyOperator( task_id="end_pipeline" ) Orchestraction# start_pipeline >> setup >> extract >> transform >> load >> end_pipeline Code complet# Apache AIrflow DAG 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62""" First DAG """ from datetime import datetime from airflow import DAG from airflow.operators.empty import EmptyOperator from airflow.operators.postgres_operator import PostgresOperator from airflow.operators.python import PythonOperator from utils.first_pipeline import extract_data, transform_data, load_data with DAG( dag_id="first_pipeline", description="First Apache Airflow Pipeline", start_date=datetime(year=2023, month=6, day=18, hour=6, minute=0), schedule_interval="30 6 * * *", # Everyday at 6:00 AM tags=["demo"] ) as dag: start_pipeline = EmptyOperator( task_id='start_pipeline', ) create_table = PostgresOperator( task_id='create_table', postgres_conn_id='postgres', sql=["""create table if not exists covid19_daily_kpi ( dt date, country varchar(200) not null, latitude numeric, longitude numeric, confirmed integer, deaths integer, recovered integer, active integer, load_datetime timestamp not null default now() )"""] ) extract = PythonOperator( task_id = 'extract_data', python_callable = extract_data ) transform = PythonOperator( task_id = 'transform_data', python_callable = transform_data ) load = PythonOperator( task_id = 'load_data', python_callable = load_data ) end_pipeline = EmptyOperator( task_id='end_pipeline', ) start_pipeline >> create_table >> extract >> transform >> load >> end_pipeline Bravo, vous avez créé votre premier pipeline avec Airflow. Dans la prochaine section, nous explorerons en détail chaque composant du pipeline et approfondirons votre compréhension d'Apache Airflow.