Application 1 - Charger un dump# # Packages from datetime import datetime, timedelta from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python import PythonOperator # Config & Parameters DEST_CONNECTION_ID = "jk_cdrfw" RUN_DATE = "{{ ds }}" # Scheduled Date START_DATE = datetime(2025, 5, 12) # Use today date to avoid undesirable reruns DAG_NAME = "fetch_fixe_recharge" DAG_DESCRIPTION = "FIXE Data Inetgration" DAG_FREQUENCY = "30 7 * * *" # Every day at 06AM DAG_TAGS = ["prod", "fixe", "fetch", "daily", "06H"] def update_ftth_recharge(query, source_connection_id, destination_connection_id, destination_table): # Get Data src_postgres_hook = PostgresHook(postgres_conn_id=source_connection_id) df = src_postgres_hook.get_pandas_df(query) # Update table dest_postgres_hook = PostgresHook(postgres_conn_id=destination_connection_id) # Remove existing data dest_postgres_hook.run(f"TRUNCATE TABLE {destination_table};") # Insert data df.to_sql(name=destination_table.split(".")[1], schema=destination_table.split(".")[0], con=dest_postgres_hook.get_sqlalchemy_engine(), if_exists="append", ## Ensure table is truncated if needed chunksize=10000, method="multi", index=False) with DAG( dag_id=DAG_NAME, default_args=default_args, description=DAG_DESCRIPTION, schedule_interval=DAG_FREQUENCY, tags=DAG_TAGS, catchup=False, ) as dag: # Start start_task = DummyOperator(task_id='start') # Upload Data upload_task = PythonOperator( task_id = 'upload_task', python_callable = update_ftth_recharge, op_kwargs={ 'query': SRC_QUERY, 'source_connection_id': SRC_CONNECTION_ID, 'destination_connection_id': DEST_CONNECTION_ID, 'destination_table': DEST_TABLE, }, ) # End end_task = DummyOperator(task_id='end') # Set tasks dependencies start_task >> upload_task >> end_task Application 2 - Actualiser une table# # Packages from datetime import datetime, timedelta from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.postgres_operator import PostgresOperator # Config & Parameters CONNECTION_ID = "jk_cdrfw" OWNER_USERNAME = "jkonka" FUNCTION = "bibox.fn_process_daily_sales" RUN_DATE = "{{ ds }}" # Scheduled Date START_DATE = datetime(2025, 5, 4) # Use today date to avoid undesirable reruns DAG_NAME = "process_daily_sales" DAG_DESCRIPTION = "RPT DAILY SALES" DAG_FREQUENCY = "30 09 * * *" # Every day at 09:30 AM DAG_TAGS = ["prod", "sales", "rpt", "daily", "09H"] with DAG( dag_id=DAG_NAME, default_args=default_args, description=DAG_DESCRIPTION, schedule_interval=DAG_FREQUENCY, tags=DAG_TAGS, catchup=False, ) as dag: # Start start_task = DummyOperator(task_id="start") # SQL Task Day-1 sql_task = PostgresOperator( task_id="sql_task", postgres_conn_id=CONNECTION_ID, sql=f"select {FUNCTION}('{RUN_DATE}'::date - 0)" ) # End end_task = DummyOperator(task_id="end") # Set tasks dependencies start_task >> sql_task >> end_task Application 3 -# """ Name : Pipeline FTTH Loan Author : Joseph Konka Creation Date : 2024.11.12 Last Update : 2024.11.12 Long Description Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt. Diam ut venenatis tellus in metus vulputate eu. Volutpat diam ut venenatis tellus in metus. Et egestas quis ipsum suspendisse ultrices gravida. Aliquam nulla facilisi cras fermentum odio eu. Blandit massa enim nec dui. Lorem ipsum dolor sit amet consectetur adipiscing. """ import os from datetime import datetime, timedelta from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.postgres_operator import PostgresOperator from airflow.operators.bash_operator import BashOperator from airflow.operators.python import PythonOperator from airflow.hooks.postgres_hook import PostgresHook # Config & Parameters CONNECTION_ID = "jk_cdrfw" OWNER_USERNAME = "jkonka" FUNCTION = "bibox.fn_ftth_loan_lms_inputs" NB_DAYS_TO_RUN = 3 SLEEP_TIME = 5 RUN_DATE = "{{ ds }}" # Scheduled Date START_DATE = datetime(2024, 11, 11) # Use today date to avoid undesirable reruns DAG_NAME = "pipeline_loan_ftth" DAG_DESCRIPTION = "Pipeline FTTH Loan" DAG_FREQUENCY = "30 08 * * *" # Every day at 08:30 AM DAG_TAGS = ['dev', 'loan', 'pipeline', 'daily', '08H'] EMAIL_LIST = [ "Joseph.KONKA@GMAIL.TG", ] # Define default_args and DAG default_args = { "owner": OWNER_USERNAME, "start_date": START_DATE, "retries": 0, "retry_delay": timedelta(minutes=60), 'email': EMAIL_LIST, 'email_on_failure': True, 'email_on_retry': False, } with DAG( dag_id=DAG_NAME, default_args=default_args, description=DAG_DESCRIPTION, schedule_interval=DAG_FREQUENCY, tags=DAG_TAGS, catchup=False, max_active_runs=1, ) as dag: ################################################################# # START start_task = DummyOperator(task_id='start', dag=dag) # END end_task = DummyOperator(task_id='end', dag=dag) ################################################################# for i in range(NB_DAYS_TO_RUN, 0, -1): break_task = BashOperator(task_id=f'break_task_{i}', bash_command=f"sleep {SLEEP_TIME}", dag=dag) # Eligible Base eligible_base = PostgresOperator( task_id=f"eligible_base_{i}", postgres_conn_id=CONNECTION_ID, sql=f"select {FUNCTION}('{RUN_DATE}'::date - {i-1})", dag=dag, ) ################################################################# # Set dependencies if i == NB_DAYS_TO_RUN: # First Run start_task >> eligible_base >> break_task elif i == 1: # Last Run break_task_tmp >> eligible_base >> break_task >> end_task else: break_task_tmp >> eligible_base >> break_task break_task_tmp = break_task #################################################################