Aller au contenu

Apache Airflow - Applications

Lorsqu'il s'agit d'orchestrer des flux de travail data de plus en plus complexes avec Python, Apache Airflow est la solution qu'il vous faut. Il permet de créer des interfaces web pour les modèles de machine learning.

Application 1 - Charger un dump#

# Packages
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python import PythonOperator


# Config & Parameters
DEST_CONNECTION_ID = "jk_cdrfw"
RUN_DATE = "{{ ds }}" # Scheduled Date
START_DATE = datetime(2025, 5, 12) # Use today date to avoid undesirable reruns

DAG_NAME = "fetch_fixe_recharge"
DAG_DESCRIPTION = "FIXE Data Inetgration"
DAG_FREQUENCY = "30 7 * * *"  # Every day at 06AM
DAG_TAGS = ["prod", "fixe", "fetch", "daily", "06H"]


def update_ftth_recharge(query, source_connection_id, destination_connection_id, destination_table):  
    # Get Data
    src_postgres_hook = PostgresHook(postgres_conn_id=source_connection_id)
    df = src_postgres_hook.get_pandas_df(query)

    # Update table
    dest_postgres_hook = PostgresHook(postgres_conn_id=destination_connection_id)

    # Remove existing data
    dest_postgres_hook.run(f"TRUNCATE TABLE {destination_table};")

    # Insert data
    df.to_sql(name=destination_table.split(".")[1], 
              schema=destination_table.split(".")[0], 
              con=dest_postgres_hook.get_sqlalchemy_engine(), 
              if_exists="append", ## Ensure table is truncated if needed
              chunksize=10000,
              method="multi",
              index=False)


with DAG(
    dag_id=DAG_NAME,
    default_args=default_args,
    description=DAG_DESCRIPTION,
    schedule_interval=DAG_FREQUENCY,
    tags=DAG_TAGS,
    catchup=False,
) as dag:

    # Start
    start_task = DummyOperator(task_id='start')

    # Upload Data
    upload_task = PythonOperator(
        task_id = 'upload_task',
        python_callable = update_ftth_recharge,
        op_kwargs={
            'query': SRC_QUERY,
            'source_connection_id': SRC_CONNECTION_ID,
            'destination_connection_id': DEST_CONNECTION_ID,
            'destination_table': DEST_TABLE,
        }, 

    )

    # End
    end_task = DummyOperator(task_id='end')

    # Set tasks dependencies
    start_task >> upload_task >> end_task

Application 2 - Actualiser une table#

# Packages
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.postgres_operator import PostgresOperator


# Config & Parameters
CONNECTION_ID = "jk_cdrfw"
OWNER_USERNAME = "jkonka"
FUNCTION = "bibox.fn_process_daily_sales"
RUN_DATE = "{{ ds }}" # Scheduled Date
START_DATE = datetime(2025, 5, 4) # Use today date to avoid undesirable reruns

DAG_NAME = "process_daily_sales"
DAG_DESCRIPTION = "RPT DAILY SALES"
DAG_FREQUENCY = "30 09 * * *"  # Every day at 09:30 AM
DAG_TAGS = ["prod", "sales", "rpt", "daily", "09H"]

with DAG(
    dag_id=DAG_NAME,
    default_args=default_args,
    description=DAG_DESCRIPTION,
    schedule_interval=DAG_FREQUENCY,
    tags=DAG_TAGS,
    catchup=False,
) as dag:

    # Start
    start_task = DummyOperator(task_id="start")

    # SQL Task Day-1
    sql_task = PostgresOperator(
        task_id="sql_task",
        postgres_conn_id=CONNECTION_ID,
        sql=f"select {FUNCTION}('{RUN_DATE}'::date - 0)"
    )

    # End
    end_task = DummyOperator(task_id="end")

    # Set tasks dependencies
    start_task >> sql_task >> end_task

Application 3 -#

"""
    Name : Pipeline FTTH Loan
    Author : Joseph Konka
    Creation Date : 2024.11.12
    Last Update : 2024.11.12

    Long Description
    Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt. 
    Diam ut venenatis tellus in metus vulputate eu. Volutpat diam ut venenatis tellus in metus.
    Et egestas quis ipsum suspendisse ultrices gravida. Aliquam nulla facilisi cras fermentum odio eu. 
    Blandit massa enim nec dui. Lorem ipsum dolor sit amet consectetur adipiscing.
"""


import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook


# Config & Parameters
CONNECTION_ID = "jk_cdrfw"
OWNER_USERNAME = "jkonka"

FUNCTION = "bibox.fn_ftth_loan_lms_inputs"

NB_DAYS_TO_RUN = 3
SLEEP_TIME = 5

RUN_DATE = "{{ ds }}" # Scheduled Date
START_DATE = datetime(2024, 11, 11) # Use today date to avoid undesirable reruns

DAG_NAME = "pipeline_loan_ftth"
DAG_DESCRIPTION = "Pipeline FTTH Loan"
DAG_FREQUENCY = "30 08 * * *"  # Every day at 08:30 AM
DAG_TAGS = ['dev', 'loan', 'pipeline', 'daily', '08H']
EMAIL_LIST = [
    "Joseph.KONKA@GMAIL.TG", 
]


# Define default_args and DAG
default_args = {
    "owner": OWNER_USERNAME,
    "start_date": START_DATE,
    "retries": 0,
    "retry_delay": timedelta(minutes=60),
    'email': EMAIL_LIST,
    'email_on_failure': True,
    'email_on_retry': False,
}

with DAG(
    dag_id=DAG_NAME,
    default_args=default_args,
    description=DAG_DESCRIPTION,
    schedule_interval=DAG_FREQUENCY,
    tags=DAG_TAGS,
    catchup=False,
    max_active_runs=1,
) as dag:

    #################################################################
    # START
    start_task = DummyOperator(task_id='start', dag=dag)

    # END
    end_task = DummyOperator(task_id='end', dag=dag)
    #################################################################

    for i in range(NB_DAYS_TO_RUN, 0, -1):
        break_task = BashOperator(task_id=f'break_task_{i}', bash_command=f"sleep {SLEEP_TIME}", dag=dag)
        # Eligible Base
        eligible_base = PostgresOperator(
            task_id=f"eligible_base_{i}",
            postgres_conn_id=CONNECTION_ID,
            sql=f"select {FUNCTION}('{RUN_DATE}'::date - {i-1})",
            dag=dag,
        )

        #################################################################
        # Set dependencies
        if i == NB_DAYS_TO_RUN: # First Run
            start_task >> eligible_base >> break_task
        elif i == 1: # Last Run
            break_task_tmp >> eligible_base >> break_task >> end_task
        else:
            break_task_tmp >> eligible_base >> break_task

        break_task_tmp = break_task
        #################################################################

Partagez sur les réseaux sociaux

Commentaires