Apache Airflow - Applications
Lorsqu'il s'agit d'orchestrer des flux de travail data de plus en plus complexes avec Python, Apache Airflow est la solution qu'il vous faut. Il permet de créer des interfaces web pour les modèles de machine learning.
Application 1 - Charger un dump#
# Packages
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python import PythonOperator
# Config & Parameters
DEST_CONNECTION_ID = "jk_cdrfw"
RUN_DATE = "{{ ds }}" # Scheduled Date
START_DATE = datetime(2025, 5, 12) # Use today date to avoid undesirable reruns
DAG_NAME = "fetch_fixe_recharge"
DAG_DESCRIPTION = "FIXE Data Inetgration"
DAG_FREQUENCY = "30 7 * * *" # Every day at 06AM
DAG_TAGS = ["prod", "fixe", "fetch", "daily", "06H"]
def update_ftth_recharge(query, source_connection_id, destination_connection_id, destination_table):
# Get Data
src_postgres_hook = PostgresHook(postgres_conn_id=source_connection_id)
df = src_postgres_hook.get_pandas_df(query)
# Update table
dest_postgres_hook = PostgresHook(postgres_conn_id=destination_connection_id)
# Remove existing data
dest_postgres_hook.run(f"TRUNCATE TABLE {destination_table};")
# Insert data
df.to_sql(name=destination_table.split(".")[1],
schema=destination_table.split(".")[0],
con=dest_postgres_hook.get_sqlalchemy_engine(),
if_exists="append", ## Ensure table is truncated if needed
chunksize=10000,
method="multi",
index=False)
with DAG(
dag_id=DAG_NAME,
default_args=default_args,
description=DAG_DESCRIPTION,
schedule_interval=DAG_FREQUENCY,
tags=DAG_TAGS,
catchup=False,
) as dag:
# Start
start_task = DummyOperator(task_id='start')
# Upload Data
upload_task = PythonOperator(
task_id = 'upload_task',
python_callable = update_ftth_recharge,
op_kwargs={
'query': SRC_QUERY,
'source_connection_id': SRC_CONNECTION_ID,
'destination_connection_id': DEST_CONNECTION_ID,
'destination_table': DEST_TABLE,
},
)
# End
end_task = DummyOperator(task_id='end')
# Set tasks dependencies
start_task >> upload_task >> end_task
Application 2 - Actualiser une table#
# Packages
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.postgres_operator import PostgresOperator
# Config & Parameters
CONNECTION_ID = "jk_cdrfw"
OWNER_USERNAME = "jkonka"
FUNCTION = "bibox.fn_process_daily_sales"
RUN_DATE = "{{ ds }}" # Scheduled Date
START_DATE = datetime(2025, 5, 4) # Use today date to avoid undesirable reruns
DAG_NAME = "process_daily_sales"
DAG_DESCRIPTION = "RPT DAILY SALES"
DAG_FREQUENCY = "30 09 * * *" # Every day at 09:30 AM
DAG_TAGS = ["prod", "sales", "rpt", "daily", "09H"]
with DAG(
dag_id=DAG_NAME,
default_args=default_args,
description=DAG_DESCRIPTION,
schedule_interval=DAG_FREQUENCY,
tags=DAG_TAGS,
catchup=False,
) as dag:
# Start
start_task = DummyOperator(task_id="start")
# SQL Task Day-1
sql_task = PostgresOperator(
task_id="sql_task",
postgres_conn_id=CONNECTION_ID,
sql=f"select {FUNCTION}('{RUN_DATE}'::date - 0)"
)
# End
end_task = DummyOperator(task_id="end")
# Set tasks dependencies
start_task >> sql_task >> end_task
Application 3 -#
"""
Name : Pipeline FTTH Loan
Author : Joseph Konka
Creation Date : 2024.11.12
Last Update : 2024.11.12
Long Description
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt.
Diam ut venenatis tellus in metus vulputate eu. Volutpat diam ut venenatis tellus in metus.
Et egestas quis ipsum suspendisse ultrices gravida. Aliquam nulla facilisi cras fermentum odio eu.
Blandit massa enim nec dui. Lorem ipsum dolor sit amet consectetur adipiscing.
"""
import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
# Config & Parameters
CONNECTION_ID = "jk_cdrfw"
OWNER_USERNAME = "jkonka"
FUNCTION = "bibox.fn_ftth_loan_lms_inputs"
NB_DAYS_TO_RUN = 3
SLEEP_TIME = 5
RUN_DATE = "{{ ds }}" # Scheduled Date
START_DATE = datetime(2024, 11, 11) # Use today date to avoid undesirable reruns
DAG_NAME = "pipeline_loan_ftth"
DAG_DESCRIPTION = "Pipeline FTTH Loan"
DAG_FREQUENCY = "30 08 * * *" # Every day at 08:30 AM
DAG_TAGS = ['dev', 'loan', 'pipeline', 'daily', '08H']
EMAIL_LIST = [
"Joseph.KONKA@GMAIL.TG",
]
# Define default_args and DAG
default_args = {
"owner": OWNER_USERNAME,
"start_date": START_DATE,
"retries": 0,
"retry_delay": timedelta(minutes=60),
'email': EMAIL_LIST,
'email_on_failure': True,
'email_on_retry': False,
}
with DAG(
dag_id=DAG_NAME,
default_args=default_args,
description=DAG_DESCRIPTION,
schedule_interval=DAG_FREQUENCY,
tags=DAG_TAGS,
catchup=False,
max_active_runs=1,
) as dag:
#################################################################
# START
start_task = DummyOperator(task_id='start', dag=dag)
# END
end_task = DummyOperator(task_id='end', dag=dag)
#################################################################
for i in range(NB_DAYS_TO_RUN, 0, -1):
break_task = BashOperator(task_id=f'break_task_{i}', bash_command=f"sleep {SLEEP_TIME}", dag=dag)
# Eligible Base
eligible_base = PostgresOperator(
task_id=f"eligible_base_{i}",
postgres_conn_id=CONNECTION_ID,
sql=f"select {FUNCTION}('{RUN_DATE}'::date - {i-1})",
dag=dag,
)
#################################################################
# Set dependencies
if i == NB_DAYS_TO_RUN: # First Run
start_task >> eligible_base >> break_task
elif i == 1: # Last Run
break_task_tmp >> eligible_base >> break_task >> end_task
else:
break_task_tmp >> eligible_base >> break_task
break_task_tmp = break_task
#################################################################